]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_pubsub.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "services/svc_zone.h"
11fdf7f2
TL
5#include "rgw_common.h"
6#include "rgw_coroutine.h"
7#include "rgw_sync_module.h"
8#include "rgw_data_sync.h"
9#include "rgw_sync_module_pubsub.h"
10#include "rgw_sync_module_pubsub_rest.h"
11#include "rgw_rest_conn.h"
12#include "rgw_cr_rados.h"
13#include "rgw_cr_rest.h"
14#include "rgw_cr_tools.h"
15#include "rgw_op.h"
16#include "rgw_pubsub.h"
17#include "rgw_pubsub_push.h"
eafe8130 18#include "rgw_notify_event_type.h"
11fdf7f2
TL
19#include "rgw_perf_counters.h"
20
eafe8130 21#include <boost/algorithm/hex.hpp>
11fdf7f2
TL
22#include <boost/asio/yield.hpp>
23
24#define dout_subsys ceph_subsys_rgw
25
26
27#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
28
29/*
30
31config:
32
33{
34 "tenant": <tenant>, # default: <empty>
35 "uid": <uid>, # default: "pubsub"
36 "data_bucket_prefix": <prefix> # default: "pubsub-"
37 "data_oid_prefix": <prefix> #
eafe8130
TL
38 "events_retention_days": <int> # default: 7
39 "start_with_full_sync" <bool> # default: false
11fdf7f2
TL
40}
41
42*/
43
44// utility function to convert the args list from string format
45// (ampresend separated with equal sign) to prased structure
46RGWHTTPArgs string_to_args(const std::string& str_args) {
47 RGWHTTPArgs args;
48 args.set(str_args);
49 args.parse();
50 return args;
51}
52
eafe8130
TL
53struct PSSubConfig {
54 std::string name;
55 std::string topic;
56 std::string push_endpoint_name;
57 std::string push_endpoint_args;
58 std::string data_bucket_name;
59 std::string data_oid_prefix;
60 std::string s3_id;
61 std::string arn_topic;
11fdf7f2
TL
62 RGWPubSubEndpoint::Ptr push_endpoint;
63
11fdf7f2
TL
64 void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
65 name = uc.name;
66 topic = uc.topic;
67 push_endpoint_name = uc.dest.push_endpoint;
68 data_bucket_name = uc.dest.bucket_name;
69 data_oid_prefix = uc.dest.oid_prefix;
eafe8130
TL
70 s3_id = uc.s3_id;
71 arn_topic = uc.dest.arn_topic;
72 if (!push_endpoint_name.empty()) {
11fdf7f2
TL
73 push_endpoint_args = uc.dest.push_endpoint_args;
74 try {
eafe8130
TL
75 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
76 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
11fdf7f2 77 } catch (const RGWPubSubEndpoint::configuration_error& e) {
eafe8130 78 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
11fdf7f2
TL
79 << push_endpoint_name << " due to: " << e.what() << dendl;
80 }
81 }
82 }
83
84 void dump(Formatter *f) const {
85 encode_json("name", name, f);
86 encode_json("topic", topic, f);
87 encode_json("push_endpoint", push_endpoint_name, f);
eafe8130 88 encode_json("push_endpoint_args", push_endpoint_args, f);
11fdf7f2
TL
89 encode_json("data_bucket_name", data_bucket_name, f);
90 encode_json("data_oid_prefix", data_oid_prefix, f);
eafe8130 91 encode_json("s3_id", s3_id, f);
11fdf7f2
TL
92 }
93
11fdf7f2
TL
94};
95
96using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
97
98struct PSTopicConfig {
eafe8130
TL
99 std::string name;
100 std::set<std::string> subs;
9f95a23c 101 std::string opaque_data;
11fdf7f2
TL
102
103 void dump(Formatter *f) const {
104 encode_json("name", name, f);
105 encode_json("subs", subs, f);
9f95a23c 106 encode_json("opaque", opaque_data, f);
11fdf7f2
TL
107 }
108};
109
110struct PSNotificationConfig {
111 uint64_t id{0};
112 string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
113 string topic;
114 bool is_prefix{false};
115
116
117 void dump(Formatter *f) const {
118 encode_json("id", id, f);
119 encode_json("path", path, f);
120 encode_json("topic", topic, f);
121 encode_json("is_prefix", is_prefix, f);
122 }
123
124 void init(CephContext *cct, const JSONFormattable& config) {
125 path = config["path"];
126 if (!path.empty() && path[path.size() - 1] == '*') {
127 path = path.substr(0, path.size() - 1);
128 is_prefix = true;
129 }
130 topic = config["topic"];
131 }
132};
133
134template<class T>
135static string json_str(const char *name, const T& obj, bool pretty = false)
136{
137 stringstream ss;
138 JSONFormatter f(pretty);
139
140 encode_json(name, obj, &f);
141 f.flush(ss);
142
143 return ss.str();
144}
145
146using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
eafe8130 147using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
11fdf7f2 148
f67539c2 149// global pubsub configuration
11fdf7f2 150struct PSConfig {
9f95a23c 151 const std::string id{"pubsub"};
11fdf7f2 152 rgw_user user;
9f95a23c
TL
153 std::string data_bucket_prefix;
154 std::string data_oid_prefix;
11fdf7f2 155 int events_retention_days{0};
11fdf7f2 156 uint64_t sync_instance{0};
eafe8130 157 bool start_with_full_sync{false};
11fdf7f2
TL
158
159 void dump(Formatter *f) const {
160 encode_json("id", id, f);
161 encode_json("user", user, f);
162 encode_json("data_bucket_prefix", data_bucket_prefix, f);
163 encode_json("data_oid_prefix", data_oid_prefix, f);
164 encode_json("events_retention_days", events_retention_days, f);
165 encode_json("sync_instance", sync_instance, f);
eafe8130 166 encode_json("start_with_full_sync", start_with_full_sync, f);
11fdf7f2
TL
167 }
168
169 void init(CephContext *cct, const JSONFormattable& config) {
170 string uid = config["uid"]("pubsub");
171 user = rgw_user(config["tenant"], uid);
172 data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
173 data_oid_prefix = config["data_oid_prefix"];
174 events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
eafe8130 175 start_with_full_sync = config["start_with_full_sync"](false);
11fdf7f2 176
f67539c2 177 ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
11fdf7f2
TL
178 }
179
180 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
181 sync_instance = instance_id;
182 }
11fdf7f2
TL
183};
184
11fdf7f2 185using PSConfigRef = std::shared_ptr<PSConfig>;
eafe8130
TL
186template<typename EventType>
187using EventRef = std::shared_ptr<EventType>;
11fdf7f2
TL
188
189struct objstore_event {
190 string id;
191 const rgw_bucket& bucket;
192 const rgw_obj_key& key;
193 const ceph::real_time& mtime;
194 const std::vector<std::pair<std::string, std::string> > *attrs;
195
196 objstore_event(const rgw_bucket& _bucket,
197 const rgw_obj_key& _key,
198 const ceph::real_time& _mtime,
199 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
200 key(_key),
201 mtime(_mtime),
202 attrs(_attrs) {}
203
204 string get_hash() {
205 string etag;
206 RGWMD5Etag hash;
207 hash.update(bucket.bucket_id);
208 hash.update(key.name);
209 hash.update(key.instance);
210 hash.finish(&etag);
211
212 assert(etag.size() > 8);
213
214 return etag.substr(0, 8);
215 }
216
217 void dump(Formatter *f) const {
218 {
219 Formatter::ObjectSection s(*f, "bucket");
220 encode_json("name", bucket.name, f);
221 encode_json("tenant", bucket.tenant, f);
222 encode_json("bucket_id", bucket.bucket_id, f);
223 }
224 {
225 Formatter::ObjectSection s(*f, "key");
226 encode_json("name", key.name, f);
227 encode_json("instance", key.instance, f);
228 }
229 utime_t mt(mtime);
230 encode_json("mtime", mt, f);
231 Formatter::ObjectSection s(*f, "attrs");
232 if (attrs) {
233 for (auto& attr : *attrs) {
234 encode_json(attr.first.c_str(), attr.second.c_str(), f);
235 }
236 }
237 }
238};
239
240static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
241 const rgw_obj_key& key,
242 const ceph::real_time& mtime,
243 const std::vector<std::pair<std::string, std::string> > *attrs,
eafe8130
TL
244 rgw::notify::EventType event_type,
245 EventRef<rgw_pubsub_event> *event) {
11fdf7f2
TL
246 *event = std::make_shared<rgw_pubsub_event>();
247
eafe8130
TL
248 EventRef<rgw_pubsub_event>& e = *event;
249 e->event_name = rgw::notify::to_ceph_string(event_type);
11fdf7f2
TL
250 e->source = bucket.name + "/" + key.name;
251 e->timestamp = real_clock::now();
252
253 objstore_event oevent(bucket, key, mtime, attrs);
254
eafe8130
TL
255 const utime_t ts(e->timestamp);
256 set_event_id(e->id, oevent.get_hash(), ts);
11fdf7f2
TL
257
258 encode_json("info", oevent, &e->info);
259}
260
f67539c2 261static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
eafe8130
TL
262 const rgw_user& owner,
263 const rgw_obj_key& key,
264 const ceph::real_time& mtime,
f67539c2 265 const std::vector<std::pair<std::string, std::string>>* attrs,
eafe8130 266 rgw::notify::EventType event_type,
f67539c2
TL
267 EventRef<rgw_pubsub_s3_event>* event) {
268 *event = std::make_shared<rgw_pubsub_s3_event>();
eafe8130 269
f67539c2
TL
270 EventRef<rgw_pubsub_s3_event>& e = *event;
271 e->eventTime = mtime;
272 e->eventName = rgw::notify::to_string(event_type);
92f5a8d4
TL
273 // userIdentity: not supported in sync module
274 // x_amz_request_id: not supported in sync module
275 // x_amz_id_2: not supported in sync module
eafe8130 276 // configurationId is filled from subscription configuration
f67539c2
TL
277 e->bucket_name = bucket.name;
278 e->bucket_ownerIdentity = owner.to_str();
279 e->bucket_arn = to_string(rgw::ARN(bucket));
280 e->bucket_id = bucket.bucket_id; // rgw extension
281 e->object_key = key.name;
92f5a8d4 282 // object_size not supported in sync module
eafe8130 283 objstore_event oevent(bucket, key, mtime, attrs);
f67539c2
TL
284 e->object_etag = oevent.get_hash();
285 e->object_versionId = key.instance;
eafe8130
TL
286
287 // use timestamp as per key sequence id (hex encoded)
288 const utime_t ts(real_clock::now());
289 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
f67539c2 290 std::back_inserter(e->object_sequencer));
eafe8130 291
f67539c2 292 set_event_id(e->id, e->object_etag, ts);
eafe8130
TL
293}
294
11fdf7f2
TL
295class PSManager;
296using PSManagerRef = std::shared_ptr<PSManager>;
297
298struct PSEnv {
299 PSConfigRef conf;
300 shared_ptr<RGWUserInfo> data_user_info;
301 PSManagerRef manager;
302
303 PSEnv() : conf(make_shared<PSConfig>()),
304 data_user_info(make_shared<RGWUserInfo>()) {}
305
306 void init(CephContext *cct, const JSONFormattable& config) {
307 conf->init(cct, config);
308 }
309
310 void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
311};
312
313using PSEnvRef = std::shared_ptr<PSEnv>;
314
eafe8130 315template<typename EventType>
11fdf7f2 316class PSEvent {
eafe8130 317 const EventRef<EventType> event;
11fdf7f2
TL
318
319public:
eafe8130 320 PSEvent(const EventRef<EventType>& _event) : event(_event) {}
11fdf7f2
TL
321
322 void format(bufferlist *bl) const {
92f5a8d4 323 bl->append(json_str("", *event));
11fdf7f2
TL
324 }
325
326 void encode_event(bufferlist& bl) const {
327 encode(*event, bl);
328 }
329
330 const string& id() const {
331 return event->id;
332 }
333};
334
335template <class T>
336class RGWSingletonCR : public RGWCoroutine {
337 friend class WrapperCR;
338
339 boost::asio::coroutine wrapper_state;
340 bool started{false};
341 int operate_ret{0};
342
343 struct WaiterInfo {
344 RGWCoroutine *cr{nullptr};
345 T *result;
346 };
347 using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
348
349 deque<WaiterInfoRef> waiters;
350
351 void add_waiter(RGWCoroutine *cr, T *result) {
352 auto waiter = std::make_shared<WaiterInfo>();
353 waiter->cr = cr;
354 waiter->result = result;
355 waiters.push_back(waiter);
356 };
357
358 bool get_next_waiter(WaiterInfoRef *waiter) {
359 if (waiters.empty()) {
360 waiter->reset();
361 return false;
362 }
363
364 *waiter = waiters.front();
365 waiters.pop_front();
366 return true;
367 }
368
369 int operate_wrapper() override {
370 reenter(&wrapper_state) {
371 while (!is_done()) {
372 ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
373 operate_ret = operate();
374 if (operate_ret < 0) {
375 ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
376 }
377 if (!is_done()) {
378 yield;
379 }
380 }
381
382 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
383 /* we're done, can't yield anymore */
384
385 WaiterInfoRef waiter;
386 while (get_next_waiter(&waiter)) {
387 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
388 waiter->cr->set_retcode(retcode);
389 waiter->cr->set_sleeping(false);
390 return_result(waiter->result);
391 put();
392 }
393
394 return retcode;
395 }
396 return 0;
397 }
398
399 virtual void return_result(T *result) {}
400
401public:
402 RGWSingletonCR(CephContext *_cct)
403 : RGWCoroutine(_cct) {}
404
405 int execute(RGWCoroutine *caller, T *result = nullptr) {
406 if (!started) {
407 ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
408 started = true;
409 caller->call(this);
410 return 0;
411 } else if (!is_done()) {
412 ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
413 get();
414 add_waiter(caller, result);
415 caller->set_sleeping(true);
416 return 0;
417 }
418
419 ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
420 caller->set_retcode(retcode);
421 return_result(result);
422 return retcode;
423 }
424};
425
426
427class PSSubscription;
428using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
429
430class PSSubscription {
431 class InitCR;
432 friend class InitCR;
433 friend class RGWPSHandleObjEventCR;
434
9f95a23c 435 RGWDataSyncCtx *sc;
11fdf7f2
TL
436 RGWDataSyncEnv *sync_env;
437 PSEnvRef env;
438 PSSubConfigRef sub_conf;
eafe8130 439 std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
11fdf7f2
TL
440 RGWBucketInfo *bucket_info{nullptr};
441 RGWDataAccessRef data_access;
442 RGWDataAccess::BucketRef bucket;
443
11fdf7f2
TL
444 InitCR *init_cr{nullptr};
445
446 class InitBucketLifecycleCR : public RGWCoroutine {
9f95a23c 447 RGWDataSyncCtx *sc;
11fdf7f2
TL
448 RGWDataSyncEnv *sync_env;
449 PSConfigRef& conf;
450 LCRule rule;
451
452 int retention_days;
453
454 rgw_bucket_lifecycle_config_params lc_config;
455
456 public:
9f95a23c 457 InitBucketLifecycleCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
458 PSConfigRef& _conf,
459 RGWBucketInfo& _bucket_info,
9f95a23c
TL
460 std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sc->cct),
461 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
462 conf(_conf) {
463 lc_config.bucket_info = _bucket_info;
464 lc_config.bucket_attrs = _bucket_attrs;
465 retention_days = conf->events_retention_days;
466 }
467
468 int operate() override {
469 reenter(this) {
470
471 rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
472
473 {
474 /* maybe we already have it configured? */
475 RGWLifecycleConfiguration old_config;
476 auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
477 if (aiter != lc_config.bucket_attrs.end()) {
478 bufferlist::const_iterator iter{&aiter->second};
479 try {
480 old_config.decode(iter);
481 } catch (const buffer::error& e) {
9f95a23c 482 ldpp_dout(sync_env->dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl;
11fdf7f2
TL
483 }
484 }
485
486 auto old_rules = old_config.get_rule_map();
487 for (auto ori : old_rules) {
488 auto& old_rule = ori.second;
489
490 if (old_rule.get_prefix().empty() &&
491 old_rule.get_expiration().get_days() == retention_days &&
492 old_rule.is_enabled()) {
9f95a23c 493 ldpp_dout(sync_env->dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
11fdf7f2
TL
494 return set_cr_done();
495 }
496 }
497 }
498
499 lc_config.config.add_rule(rule);
500 yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
501 sync_env->store,
9f95a23c
TL
502 lc_config,
503 sync_env->dpp));
11fdf7f2 504 if (retcode < 0) {
9f95a23c 505 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
11fdf7f2
TL
506 return set_cr_error(retcode);
507 }
508
509 return set_cr_done();
510 }
511 return 0;
512 }
513 };
514
515 class InitCR : public RGWSingletonCR<bool> {
9f95a23c 516 RGWDataSyncCtx *sc;
11fdf7f2
TL
517 RGWDataSyncEnv *sync_env;
518 PSSubscriptionRef sub;
519 rgw_get_bucket_info_params get_bucket_info;
520 rgw_bucket_create_local_params create_bucket;
521 PSConfigRef& conf;
522 PSSubConfigRef& sub_conf;
523 int i;
524
525 public:
9f95a23c
TL
526 InitCR(RGWDataSyncCtx *_sc,
527 PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct),
528 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
529 sub(_sub), conf(sub->env->conf),
530 sub_conf(sub->sub_conf) {
531 }
532
533 int operate() override {
534 reenter(this) {
535 get_bucket_info.tenant = conf->user.tenant;
536 get_bucket_info.bucket_name = sub_conf->data_bucket_name;
537 sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
538
539 for (i = 0; i < 2; ++i) {
540 yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
541 sync_env->store,
542 get_bucket_info,
543 sub->get_bucket_info_result));
544 if (retcode < 0 && retcode != -ENOENT) {
9f95a23c 545 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant="
11fdf7f2
TL
546 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
547 }
548 if (retcode == 0) {
549 {
550 auto& result = sub->get_bucket_info_result;
551 sub->bucket_info = &result->bucket_info;
552
553 int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
554 if (ret < 0) {
9f95a23c 555 ldpp_dout(sync_env->dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
11fdf7f2
TL
556 return set_cr_error(ret);
557 }
558 }
559
9f95a23c 560 yield call(new InitBucketLifecycleCR(sc, conf,
11fdf7f2
TL
561 sub->get_bucket_info_result->bucket_info,
562 sub->get_bucket_info_result->attrs));
563 if (retcode < 0) {
9f95a23c 564 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
11fdf7f2
TL
565 return set_cr_error(retcode);
566 }
567
568 return set_cr_done();
569 }
570
571 create_bucket.user_info = sub->env->data_user_info;
572 create_bucket.bucket_name = sub_conf->data_bucket_name;
9f95a23c 573 ldpp_dout(sync_env->dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
11fdf7f2
TL
574 yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
575 sync_env->store,
9f95a23c
TL
576 create_bucket,
577 sync_env->dpp));
11fdf7f2 578 if (retcode < 0) {
9f95a23c 579 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket: " << "tenant="
11fdf7f2
TL
580 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
581 return set_cr_error(retcode);
582 }
583
584 /* second iteration: we got -ENOENT and created a bucket */
585 }
586
587 /* failed twice on -ENOENT, unexpected */
9f95a23c 588 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
11fdf7f2
TL
589 << " name=" << get_bucket_info.bucket_name << dendl;
590 return set_cr_error(-EIO);
591 }
592 return 0;
593 }
594 };
595
eafe8130 596 template<typename EventType>
11fdf7f2 597 class StoreEventCR : public RGWCoroutine {
9f95a23c 598 RGWDataSyncCtx* const sc;
11fdf7f2
TL
599 RGWDataSyncEnv* const sync_env;
600 const PSSubscriptionRef sub;
eafe8130 601 const PSEvent<EventType> pse;
11fdf7f2
TL
602 const string oid_prefix;
603
604 public:
9f95a23c 605 StoreEventCR(RGWDataSyncCtx* const _sc,
11fdf7f2 606 const PSSubscriptionRef& _sub,
9f95a23c
TL
607 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
608 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
609 sub(_sub),
610 pse(_event),
611 oid_prefix(sub->sub_conf->data_oid_prefix) {
612 }
613
614 int operate() override {
11fdf7f2
TL
615 rgw_object_simple_put_params put_obj;
616 reenter(this) {
617
618 put_obj.bucket = sub->bucket;
619 put_obj.key = rgw_obj_key(oid_prefix + pse.id());
620
621 pse.format(&put_obj.data);
622
623 {
624 bufferlist bl;
625 pse.encode_event(bl);
626 bufferlist bl64;
627 bl.encode_base64(bl64);
628 put_obj.user_data = bl64.to_str();
629 }
630
631 yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
632 sync_env->store,
9f95a23c
TL
633 put_obj,
634 sync_env->dpp));
11fdf7f2 635 if (retcode < 0) {
eafe8130 636 ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
11fdf7f2
TL
637 return set_cr_error(retcode);
638 } else {
eafe8130 639 ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
11fdf7f2
TL
640 }
641
642 return set_cr_done();
643 }
644 return 0;
645 }
646 };
647
eafe8130 648 template<typename EventType>
11fdf7f2 649 class PushEventCR : public RGWCoroutine {
9f95a23c 650 RGWDataSyncCtx* const sc;
11fdf7f2 651 RGWDataSyncEnv* const sync_env;
eafe8130 652 const EventRef<EventType> event;
11fdf7f2
TL
653 const PSSubConfigRef& sub_conf;
654
655 public:
9f95a23c 656 PushEventCR(RGWDataSyncCtx* const _sc,
11fdf7f2 657 const PSSubscriptionRef& _sub,
9f95a23c
TL
658 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
659 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
660 event(_event),
661 sub_conf(_sub->sub_conf) {
662 }
663
664 int operate() override {
665 reenter(this) {
666 ceph_assert(sub_conf->push_endpoint);
667 yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
668
669 if (retcode < 0) {
eafe8130 670 ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
11fdf7f2 671 " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
11fdf7f2
TL
672 return set_cr_error(retcode);
673 }
674
eafe8130 675 ldout(sync_env->cct, 20) << "event: " << event->id <<
11fdf7f2 676 " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
11fdf7f2
TL
677 return set_cr_done();
678 }
679 return 0;
680 }
681 };
682
683public:
9f95a23c 684 PSSubscription(RGWDataSyncCtx *_sc,
11fdf7f2 685 PSEnvRef _env,
9f95a23c 686 PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
687 env(_env),
688 sub_conf(_sub_conf),
689 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
690
9f95a23c 691 PSSubscription(RGWDataSyncCtx *_sc,
11fdf7f2 692 PSEnvRef _env,
9f95a23c 693 rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
694 env(_env),
695 sub_conf(std::make_shared<PSSubConfig>()),
696 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
697 sub_conf->from_user_conf(sync_env->cct, user_sub_conf);
698 }
699 virtual ~PSSubscription() {
700 if (init_cr) {
701 init_cr->put();
702 }
703 }
704
705 template <class C>
9f95a23c 706 static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc,
11fdf7f2
TL
707 PSEnvRef _env,
708 C& _sub_conf) {
9f95a23c
TL
709 auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf);
710 sub->init_cr = new InitCR(_sc, sub);
11fdf7f2
TL
711 sub->init_cr->get();
712 return sub;
713 }
714
715 int call_init_cr(RGWCoroutine *caller) {
716 return init_cr->execute(caller);
717 }
718
eafe8130 719 template<typename EventType>
9f95a23c
TL
720 static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
721 return new StoreEventCR<EventType>(sc, sub, event);
11fdf7f2
TL
722 }
723
eafe8130 724 template<typename EventType>
9f95a23c
TL
725 static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
726 return new PushEventCR<EventType>(sc, sub, event);
11fdf7f2
TL
727 }
728 friend class InitCR;
729};
730
11fdf7f2
TL
731class PSManager
732{
9f95a23c 733 RGWDataSyncCtx *sc;
11fdf7f2
TL
734 RGWDataSyncEnv *sync_env;
735 PSEnvRef env;
736
eafe8130 737 std::map<string, PSSubscriptionRef> subs;
11fdf7f2
TL
738
739 class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
9f95a23c 740 RGWDataSyncCtx *sc;
11fdf7f2
TL
741 RGWDataSyncEnv *sync_env;
742 PSManagerRef mgr;
743 rgw_user owner;
744 string sub_name;
745 string sub_id;
746 PSSubscriptionRef *ref;
747
748 PSConfigRef conf;
749
750 PSSubConfigRef sub_conf;
751 rgw_pubsub_sub_config user_sub_conf;
eafe8130 752
11fdf7f2 753 public:
9f95a23c 754 GetSubCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
755 PSManagerRef& _mgr,
756 const rgw_user& _owner,
757 const string& _sub_name,
9f95a23c
TL
758 PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct),
759 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
760 mgr(_mgr),
761 owner(_owner),
762 sub_name(_sub_name),
763 ref(_ref),
764 conf(mgr->env->conf) {
765 }
eafe8130 766 ~GetSubCR() { }
11fdf7f2
TL
767
768 int operate() override {
769 reenter(this) {
770 if (owner.empty()) {
f67539c2 771 ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
11fdf7f2 772 mgr->remove_get_sub(owner, sub_name);
f67539c2 773 return set_cr_error(-EINVAL);
11fdf7f2
TL
774 } else {
775 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
776 yield {
f67539c2 777 RGWPubSub ps(sync_env->store, owner.tenant);
11fdf7f2 778 rgw_raw_obj obj;
f67539c2 779 ps.get_sub_meta_obj(sub_name, &obj);
11fdf7f2 780 bool empty_on_enoent = false;
9f95a23c 781 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
11fdf7f2
TL
782 obj,
783 &user_sub_conf, empty_on_enoent));
784 }
785 if (retcode < 0) {
786 mgr->remove_get_sub(owner, sub_name);
787 return set_cr_error(retcode);
788 }
789
9f95a23c 790 *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
11fdf7f2
TL
791 }
792
793 yield (*ref)->call_init_cr(this);
794 if (retcode < 0) {
f67539c2 795 ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
11fdf7f2
TL
796 mgr->remove_get_sub(owner, sub_name);
797 return set_cr_error(retcode);
798 }
799
11fdf7f2
TL
800 mgr->remove_get_sub(owner, sub_name);
801
802 return set_cr_done();
803 }
804 return 0;
805 }
806
807 void return_result(PSSubscriptionRef *result) override {
808 ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
809 if (retcode >= 0) {
810 *result = *ref;
811 }
812 }
813 };
814
815 string sub_id(const rgw_user& owner, const string& sub_name) {
816 string owner_prefix;
817 if (!owner.empty()) {
818 owner_prefix = owner.to_str() + "/";
819 }
820
821 return owner_prefix + sub_name;
822 }
823
eafe8130 824 std::map<std::string, GetSubCR *> get_subs;
11fdf7f2
TL
825
826 GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
827 return get_subs[sub_id(owner, name)];
828 }
829
830 void remove_get_sub(const rgw_user& owner, const string& name) {
831 get_subs.erase(sub_id(owner, name));
832 }
833
834 bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
835 auto iter = subs.find(sub_id(owner, sub_name));
836 if (iter != subs.end()) {
837 *sub = iter->second;
838 return true;
839 }
840 return false;
841 }
842
9f95a23c
TL
843 PSManager(RGWDataSyncCtx *_sc,
844 PSEnvRef _env) : sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
845 env(_env) {}
846
847public:
9f95a23c 848 static PSManagerRef get_shared(RGWDataSyncCtx *_sc,
11fdf7f2 849 PSEnvRef _env) {
9f95a23c 850 return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
11fdf7f2
TL
851 }
852
9f95a23c 853 static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr,
11fdf7f2
TL
854 RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
855 if (mgr->find_sub_instance(owner, sub_name, ref)) {
856 /* found it! nothing to execute */
9f95a23c 857 ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl;
11fdf7f2
TL
858 }
859 auto& gs = mgr->get_get_subs(owner, sub_name);
860 if (!gs) {
9f95a23c
TL
861 ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl;
862 gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
11fdf7f2 863 }
9f95a23c 864 ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl;
11fdf7f2
TL
865 return gs->execute(caller, ref);
866 }
867
868 friend class GetSubCR;
869};
870
871void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
872 manager = mgr;
873 conf->init_instance(realm, instance_id);
874}
875
876class RGWPSInitEnvCBCR : public RGWCoroutine {
9f95a23c 877 RGWDataSyncCtx *sc;
11fdf7f2
TL
878 RGWDataSyncEnv *sync_env;
879 PSEnvRef env;
880 PSConfigRef& conf;
881
882 rgw_user_create_params create_user;
883 rgw_get_user_info_params get_user_info;
884public:
9f95a23c
TL
885 RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc,
886 PSEnvRef& _env) : RGWCoroutine(_sc->cct),
887 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
888 env(_env), conf(env->conf) {}
889 int operate() override {
890 reenter(this) {
9f95a23c 891 ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl;
11fdf7f2
TL
892
893 /* nothing to do here right now */
894 create_user.user = conf->user;
895 create_user.max_buckets = 0; /* unlimited */
896 create_user.display_name = "pubsub";
897 create_user.generate_key = false;
9f95a23c 898 yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, sync_env->dpp));
1911f103 899 if (retcode < 0 && retcode != -ERR_USER_EXIST) {
9f95a23c 900 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
11fdf7f2
TL
901 return set_cr_error(retcode);
902 }
903
904 get_user_info.user = conf->user;
905 yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
906 if (retcode < 0) {
9f95a23c 907 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
11fdf7f2
TL
908 return set_cr_error(retcode);
909 }
910
9f95a23c 911 ldpp_dout(sync_env->dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
11fdf7f2
TL
912
913
914 return set_cr_done();
915 }
916 return 0;
917 }
918};
919
eafe8130
TL
920bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
921 if (!match(filter.events, event_type)) {
922 return false;
923 }
924 if (!match(filter.s3_filter.key_filter, key_name)) {
925 return false;
926 }
927 return true;
928}
929
11fdf7f2 930class RGWPSFindBucketTopicsCR : public RGWCoroutine {
9f95a23c 931 RGWDataSyncCtx *sc;
11fdf7f2
TL
932 RGWDataSyncEnv *sync_env;
933 PSEnvRef env;
934 rgw_user owner;
935 rgw_bucket bucket;
936 rgw_obj_key key;
eafe8130 937 rgw::notify::EventType event_type;
11fdf7f2 938
f67539c2 939 RGWPubSub ps;
11fdf7f2
TL
940
941 rgw_raw_obj bucket_obj;
942 rgw_raw_obj user_obj;
943 rgw_pubsub_bucket_topics bucket_topics;
f67539c2 944 rgw_pubsub_topics user_topics;
11fdf7f2
TL
945 TopicsRef *topics;
946public:
9f95a23c 947 RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
948 PSEnvRef& _env,
949 const rgw_user& _owner,
950 const rgw_bucket& _bucket,
951 const rgw_obj_key& _key,
eafe8130 952 rgw::notify::EventType _event_type,
9f95a23c
TL
953 TopicsRef *_topics) : RGWCoroutine(_sc->cct),
954 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
955 env(_env),
956 owner(_owner),
957 bucket(_bucket),
958 key(_key),
eafe8130 959 event_type(_event_type),
f67539c2 960 ps(sync_env->store, owner.tenant),
11fdf7f2
TL
961 topics(_topics) {
962 *topics = std::make_shared<vector<PSTopicConfigRef> >();
963 }
964 int operate() override {
965 reenter(this) {
f67539c2
TL
966 ps.get_bucket_meta_obj(bucket, &bucket_obj);
967 ps.get_meta_obj(&user_obj);
11fdf7f2
TL
968
969 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
970 yield {
971 bool empty_on_enoent = true;
9f95a23c 972 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
11fdf7f2
TL
973 bucket_obj,
974 &bucket_topics, empty_on_enoent));
975 }
976 if (retcode < 0 && retcode != -ENOENT) {
977 return set_cr_error(retcode);
978 }
979
980 ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
981
982 if (!bucket_topics.topics.empty()) {
f67539c2 983 using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
11fdf7f2
TL
984 yield {
985 bool empty_on_enoent = true;
9f95a23c 986 call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
11fdf7f2
TL
987 user_obj,
988 &user_topics, empty_on_enoent));
989 }
990 if (retcode < 0 && retcode != -ENOENT) {
991 return set_cr_error(retcode);
992 }
993 }
994
995 for (auto& titer : bucket_topics.topics) {
996 auto& topic_filter = titer.second;
997 auto& info = topic_filter.topic;
eafe8130 998 if (!match(topic_filter, key.name, event_type)) {
11fdf7f2
TL
999 continue;
1000 }
eafe8130 1001 std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
11fdf7f2
TL
1002 tc->name = info.name;
1003 tc->subs = user_topics.topics[info.name].subs;
9f95a23c 1004 tc->opaque_data = info.opaque_data;
11fdf7f2
TL
1005 (*topics)->push_back(tc);
1006 }
1007
11fdf7f2
TL
1008 return set_cr_done();
1009 }
1010 return 0;
1011 }
1012};
1013
1014class RGWPSHandleObjEventCR : public RGWCoroutine {
9f95a23c 1015 RGWDataSyncCtx* const sc;
11fdf7f2 1016 const PSEnvRef env;
f67539c2 1017 const rgw_user owner;
eafe8130 1018 const EventRef<rgw_pubsub_event> event;
f67539c2 1019 const EventRef<rgw_pubsub_s3_event> s3_event;
11fdf7f2 1020 const TopicsRef topics;
11fdf7f2
TL
1021 bool has_subscriptions;
1022 bool event_handled;
11fdf7f2 1023 PSSubscriptionRef sub;
eafe8130 1024 std::vector<PSTopicConfigRef>::const_iterator titer;
9f95a23c 1025 std::set<std::string>::const_iterator siter;
11fdf7f2
TL
1026
1027public:
9f95a23c 1028 RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
11fdf7f2
TL
1029 const PSEnvRef _env,
1030 const rgw_user& _owner,
eafe8130 1031 const EventRef<rgw_pubsub_event>& _event,
f67539c2 1032 const EventRef<rgw_pubsub_s3_event>& _s3_event,
9f95a23c
TL
1033 const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
1034 sc(_sc),
11fdf7f2
TL
1035 env(_env),
1036 owner(_owner),
1037 event(_event),
f67539c2 1038 s3_event(_s3_event),
11fdf7f2 1039 topics(_topics),
11fdf7f2
TL
1040 has_subscriptions(false),
1041 event_handled(false) {}
1042
1043 int operate() override {
1044 reenter(this) {
9f95a23c 1045 ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone
11fdf7f2
TL
1046 << " event=" << json_str("event", *event, false)
1047 << " owner=" << owner << dendl;
1048
9f95a23c 1049 ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
eafe8130
TL
1050
1051 // outside caller should check that
1052 ceph_assert(!topics->empty());
11fdf7f2
TL
1053
1054 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
1055
eafe8130 1056 // loop over all topics related to the bucket/object
11fdf7f2 1057 for (titer = topics->begin(); titer != topics->end(); ++titer) {
9f95a23c 1058 ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" <<
11fdf7f2 1059 (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
eafe8130 1060 // loop over all subscriptions of the topic
11fdf7f2 1061 for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
9f95a23c 1062 ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
11fdf7f2 1063 has_subscriptions = true;
f67539c2
TL
1064 // try to read subscription configuration
1065 yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub);
1066 if (retcode < 0) {
1067 if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
1068 ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1069 << " ret=" << retcode << dendl;
1070 if (retcode == -ENOENT) {
1071 // missing subscription info should be reflected back as invalid argument
1072 // and not as missing object
1073 retcode = -EINVAL;
1074 }
1075 // try the next subscription
1076 continue;
1077 }
1078 if (sub->sub_conf->s3_id.empty()) {
1079 // subscription was not made by S3 compatible API
1080 ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1081 yield call(PSSubscription::store_event_cr(sc, sub, event));
11fdf7f2 1082 if (retcode < 0) {
f67539c2
TL
1083 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1084 ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
1085 } else {
1086 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1087 event_handled = true;
11fdf7f2 1088 }
f67539c2
TL
1089 if (sub->sub_conf->push_endpoint) {
1090 ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1091 yield call(PSSubscription::push_event_cr(sc, sub, event));
eafe8130 1092 if (retcode < 0) {
f67539c2
TL
1093 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1094 ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
eafe8130 1095 } else {
f67539c2 1096 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
eafe8130
TL
1097 event_handled = true;
1098 }
f67539c2
TL
1099 }
1100 } else {
1101 // subscription was made by S3 compatible API
1102 ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1103 s3_event->configurationId = sub->sub_conf->s3_id;
1104 s3_event->opaque_data = (*titer)->opaque_data;
1105 yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
1106 if (retcode < 0) {
1107 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1108 ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
eafe8130 1109 } else {
f67539c2
TL
1110 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1111 event_handled = true;
1112 }
1113 if (sub->sub_conf->push_endpoint) {
1114 ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1115 yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
11fdf7f2 1116 if (retcode < 0) {
f67539c2
TL
1117 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1118 ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
11fdf7f2 1119 } else {
f67539c2 1120 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
11fdf7f2
TL
1121 event_handled = true;
1122 }
1123 }
1124 }
11fdf7f2
TL
1125 }
1126 }
1127 if (has_subscriptions && !event_handled) {
1128 // event is considered "lost" of it has subscriptions on any of its topics
1129 // but it was not stored in, or pushed to, any of them
1130 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
1131 }
1132 if (retcode < 0) {
1133 return set_cr_error(retcode);
1134 }
1135 return set_cr_done();
1136 }
1137 return 0;
1138 }
1139};
1140
eafe8130 1141// coroutine invoked on remote object creation
11fdf7f2 1142class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
9f95a23c
TL
1143 RGWDataSyncCtx *sc;
1144 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1145 PSEnvRef env;
1146 std::optional<uint64_t> versioned_epoch;
eafe8130 1147 EventRef<rgw_pubsub_event> event;
f67539c2 1148 EventRef<rgw_pubsub_s3_event> s3_event;
11fdf7f2
TL
1149 TopicsRef topics;
1150public:
9f95a23c
TL
1151 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1152 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
11fdf7f2 1153 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
9f95a23c
TL
1154 TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1155 sc(_sc),
1156 sync_pipe(_sync_pipe),
11fdf7f2
TL
1157 env(_env),
1158 versioned_epoch(_versioned_epoch),
1159 topics(_topics) {
1160 }
1161 int operate() override {
1162 reenter(this) {
9f95a23c
TL
1163 ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone
1164 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
11fdf7f2
TL
1165 << " attrs=" << attrs << dendl;
1166 {
1167 std::vector<std::pair<std::string, std::string> > attrs;
1168 for (auto& attr : attrs) {
9f95a23c 1169 std::string k = attr.first;
11fdf7f2
TL
1170 if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
1171 k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
1172 }
1173 attrs.push_back(std::make_pair(k, attr.second));
eafe8130 1174 }
f67539c2 1175 // at this point we don't know whether we need the ceph event or S3 event
eafe8130
TL
1176 // this is why both are created here, once we have information about the
1177 // subscription, we will store/push only the relevant ones
9f95a23c
TL
1178 make_event_ref(sc->cct,
1179 sync_pipe.info.source_bs.bucket, key,
11fdf7f2 1180 mtime, &attrs,
eafe8130 1181 rgw::notify::ObjectCreated, &event);
f67539c2 1182 make_s3_event_ref(sc->cct,
9f95a23c 1183 sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
eafe8130 1184 mtime, &attrs,
f67539c2 1185 rgw::notify::ObjectCreated, &s3_event);
11fdf7f2
TL
1186 }
1187
f67539c2 1188 yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
11fdf7f2
TL
1189 if (retcode < 0) {
1190 return set_cr_error(retcode);
1191 }
1192 return set_cr_done();
1193 }
1194 return 0;
1195 }
1196};
1197
1198class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
9f95a23c 1199 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1200 PSEnvRef env;
1201 std::optional<uint64_t> versioned_epoch;
1202 TopicsRef topics;
1203public:
9f95a23c
TL
1204 RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1205 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
11fdf7f2 1206 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
9f95a23c
TL
1207 TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1208 sync_pipe(_sync_pipe),
11fdf7f2
TL
1209 env(_env), versioned_epoch(_versioned_epoch),
1210 topics(_topics) {
1211 }
1212
1213 ~RGWPSHandleRemoteObjCR() override {}
1214
1215 RGWStatRemoteObjCBCR *allocate_callback() override {
9f95a23c 1216 return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics);
11fdf7f2
TL
1217 }
1218};
1219
1220class RGWPSHandleObjCreateCR : public RGWCoroutine {
9f95a23c
TL
1221 RGWDataSyncCtx *sc;
1222 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1223 rgw_obj_key key;
1224 PSEnvRef env;
1225 std::optional<uint64_t> versioned_epoch;
1226 TopicsRef topics;
1227public:
9f95a23c
TL
1228 RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc,
1229 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1230 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct),
1231 sc(_sc),
1232 sync_pipe(_sync_pipe),
11fdf7f2
TL
1233 key(_key),
1234 env(_env),
1235 versioned_epoch(_versioned_epoch) {
1236 }
1237
1238 ~RGWPSHandleObjCreateCR() override {}
1239
1240 int operate() override {
1241 reenter(this) {
9f95a23c
TL
1242 yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
1243 sync_pipe.info.source_bs.bucket, key,
eafe8130 1244 rgw::notify::ObjectCreated,
11fdf7f2
TL
1245 &topics));
1246 if (retcode < 0) {
9f95a23c 1247 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
11fdf7f2
TL
1248 return set_cr_error(retcode);
1249 }
1250 if (topics->empty()) {
9f95a23c 1251 ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
11fdf7f2
TL
1252 return set_cr_done();
1253 }
9f95a23c 1254 yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));
11fdf7f2
TL
1255 if (retcode < 0) {
1256 return set_cr_error(retcode);
1257 }
1258 return set_cr_done();
1259 }
1260 return 0;
1261 }
1262};
1263
eafe8130 1264// coroutine invoked on remote object deletion
11fdf7f2 1265class RGWPSGenericObjEventCBCR : public RGWCoroutine {
9f95a23c 1266 RGWDataSyncCtx *sc;
11fdf7f2
TL
1267 PSEnvRef env;
1268 rgw_user owner;
1269 rgw_bucket bucket;
1270 rgw_obj_key key;
1271 ceph::real_time mtime;
eafe8130
TL
1272 rgw::notify::EventType event_type;
1273 EventRef<rgw_pubsub_event> event;
f67539c2 1274 EventRef<rgw_pubsub_s3_event> s3_event;
11fdf7f2
TL
1275 TopicsRef topics;
1276public:
9f95a23c 1277 RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
11fdf7f2 1278 PSEnvRef _env,
9f95a23c
TL
1279 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1280 rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct),
1281 sc(_sc),
11fdf7f2 1282 env(_env),
9f95a23c
TL
1283 owner(_sync_pipe.dest_bucket_info.owner),
1284 bucket(_sync_pipe.dest_bucket_info.bucket),
11fdf7f2 1285 key(_key),
eafe8130 1286 mtime(_mtime), event_type(_event_type) {}
11fdf7f2
TL
1287 int operate() override {
1288 reenter(this) {
9f95a23c 1289 ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone
11fdf7f2 1290 << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
9f95a23c 1291 yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics));
11fdf7f2 1292 if (retcode < 0) {
9f95a23c 1293 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
11fdf7f2
TL
1294 return set_cr_error(retcode);
1295 }
1296 if (topics->empty()) {
9f95a23c 1297 ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
11fdf7f2
TL
1298 return set_cr_done();
1299 }
f67539c2 1300 // at this point we don't know whether we need the ceph event or S3 event
eafe8130
TL
1301 // this is why both are created here, once we have information about the
1302 // subscription, we will store/push only the relevant ones
9f95a23c 1303 make_event_ref(sc->cct,
eafe8130
TL
1304 bucket, key,
1305 mtime, nullptr,
1306 event_type, &event);
f67539c2 1307 make_s3_event_ref(sc->cct,
eafe8130
TL
1308 bucket, owner, key,
1309 mtime, nullptr,
f67539c2
TL
1310 event_type, &s3_event);
1311 yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
11fdf7f2
TL
1312 if (retcode < 0) {
1313 return set_cr_error(retcode);
1314 }
1315 return set_cr_done();
1316 }
1317 return 0;
1318 }
1319
1320};
1321
1322class RGWPSDataSyncModule : public RGWDataSyncModule {
1323 PSEnvRef env;
1324 PSConfigRef& conf;
eafe8130 1325
11fdf7f2
TL
1326public:
1327 RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
1328 env->init(cct, config);
1329 }
eafe8130 1330
11fdf7f2
TL
1331 ~RGWPSDataSyncModule() override {}
1332
9f95a23c
TL
1333 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1334 auto sync_env = sc->env;
1335 PSManagerRef mgr = PSManager::get_shared(sc, env);
1336 env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
11fdf7f2
TL
1337 }
1338
9f95a23c
TL
1339 RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
1340 ldout(sc->cct, 5) << conf->id << ": start" << dendl;
1341 return new RGWPSInitEnvCBCR(sc, env);
11fdf7f2 1342 }
eafe8130 1343
9f95a23c 1344 RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
eafe8130 1345 rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
9f95a23c 1346 ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
eafe8130 1347 " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
9f95a23c 1348 return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
11fdf7f2 1349 }
eafe8130 1350
9f95a23c 1351 RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
eafe8130 1352 rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
9f95a23c 1353 ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
eafe8130 1354 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
9f95a23c 1355 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
11fdf7f2 1356 }
eafe8130 1357
9f95a23c 1358 RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
eafe8130 1359 rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
9f95a23c 1360 ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
eafe8130 1361 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
9f95a23c 1362 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
11fdf7f2
TL
1363 }
1364
1365 PSConfigRef& get_conf() { return conf; }
1366};
1367
1368RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
1369{
1370 data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
9f95a23c 1371 const std::string jconf = json_str("conf", *data_handler->get_conf());
11fdf7f2
TL
1372 JSONParser p;
1373 if (!p.parse(jconf.c_str(), jconf.size())) {
eafe8130 1374 ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
11fdf7f2
TL
1375 effective_conf = config;
1376 } else {
1377 effective_conf.decode_json(&p);
1378 }
1379}
1380
1381RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
1382{
1383 return data_handler.get();
1384}
1385
1386RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
1387 if (dialect != RGW_REST_S3) {
1388 return orig;
1389 }
eafe8130
TL
1390 return new RGWRESTMgr_PubSub();
1391}
1392
1393bool RGWPSSyncModuleInstance::should_full_sync() const {
1394 return data_handler->get_conf()->start_with_full_sync;
11fdf7f2
TL
1395}
1396
1397int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
1398 instance->reset(new RGWPSSyncModuleInstance(cct, config));
1399 return 0;
1400}
1401
eafe8130 1402