]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_es.cc
update source to 12.2.11
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
1 #include "rgw_common.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_sync_module.h"
4 #include "rgw_data_sync.h"
5 #include "rgw_sync_module_es.h"
6 #include "rgw_sync_module_es_rest.h"
7 #include "rgw_rest_conn.h"
8 #include "rgw_cr_rest.h"
9 #include "rgw_op.h"
10 #include "rgw_es_query.h"
11
12 #include "include/str_list.h"
13
14 #include <boost/asio/yield.hpp>
15
16 #define dout_subsys ceph_subsys_rgw
17
18
19 /*
20 * whitelist utility. Config string is a list of entries, where an entry is either an item,
21 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
22 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
23 * with an asterisk. For example:
24 *
25 * bucket1, bucket2, foo*, *bar
26 */
27 class ItemList {
28 bool approve_all{false};
29
30 set<string> entries;
31 set<string> prefixes;
32 set<string> suffixes;
33
34 void parse(const string& str) {
35 list<string> l;
36
37 get_str_list(str, ",", l);
38
39 for (auto& entry : l) {
40 entry = rgw_trim_whitespace(entry);
41 if (entry.empty()) {
42 continue;
43 }
44
45 if (entry == "*") {
46 approve_all = true;
47 return;
48 }
49
50 if (entry[0] == '*') {
51 suffixes.insert(entry.substr(1));
52 continue;
53 }
54
55 if (entry.back() == '*') {
56 prefixes.insert(entry.substr(0, entry.size() - 1));
57 continue;
58 }
59
60 entries.insert(entry);
61 }
62 }
63
64 public:
65 ItemList() {}
66 void init(const string& str, bool def_val) {
67 if (str.empty()) {
68 approve_all = def_val;
69 } else {
70 parse(str);
71 }
72 }
73
74 bool exists(const string& entry) {
75 if (approve_all) {
76 return true;
77 }
78
79 if (entries.find(entry) != entries.end()) {
80 return true;
81 }
82
83 auto i = prefixes.upper_bound(entry);
84 if (i != prefixes.begin()) {
85 --i;
86 if (boost::algorithm::starts_with(entry, *i)) {
87 return true;
88 }
89 }
90
91 for (i = suffixes.begin(); i != suffixes.end(); ++i) {
92 if (boost::algorithm::ends_with(entry, *i)) {
93 return true;
94 }
95 }
96
97 return false;
98 }
99 };
100
101 #define ES_NUM_SHARDS_MIN 5
102
103 #define ES_NUM_SHARDS_DEFAULT 16
104 #define ES_NUM_REPLICAS_DEFAULT 1
105
106 struct ElasticConfig {
107 uint64_t sync_instance{0};
108 string id;
109 string index_path;
110 std::unique_ptr<RGWRESTConn> conn;
111 bool explicit_custom_meta{true};
112 string override_index_path;
113 ItemList index_buckets;
114 ItemList allow_owners;
115 uint32_t num_shards{0};
116 uint32_t num_replicas{0};
117
118 void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) {
119 string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
120 id = string("elastic:") + elastic_endpoint;
121 conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
122 explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
123 index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */
124 allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */
125 override_index_path = rgw_conf_get(config, "override_index_path", "");
126 num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT);
127 if (num_shards < ES_NUM_SHARDS_MIN) {
128 num_shards = ES_NUM_SHARDS_MIN;
129 }
130 num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT);
131 }
132
133 void init_instance(RGWRealm& realm, uint64_t instance_id) {
134 sync_instance = instance_id;
135
136 if (!override_index_path.empty()) {
137 index_path = override_index_path;
138 return;
139 }
140
141 char buf[32];
142 snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
143
144 index_path = "/rgw-" + realm.get_name() + buf;
145 }
146
147 string get_index_path() {
148 return index_path;
149 }
150
151 string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
152 return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
153 }
154
155 bool should_handle_operation(RGWBucketInfo& bucket_info) {
156 return index_buckets.exists(bucket_info.bucket.name) &&
157 allow_owners.exists(bucket_info.owner.to_str());
158 }
159 };
160
161 using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
162
163 struct es_dump_type {
164 const char *type;
165 const char *format;
166 bool analyzed;
167
168 es_dump_type(const char *t, const char *f = nullptr, bool a = false) : type(t), format(f), analyzed(a) {}
169
170 void dump(Formatter *f) const {
171 encode_json("type", type, f);
172 if (format) {
173 encode_json("format", format, f);
174 }
175 if (!analyzed && strcmp(type, "string") == 0) {
176 encode_json("index", "not_analyzed", f);
177 }
178 }
179 };
180
181 struct es_index_mappings {
182 void dump_custom(Formatter *f, const char *section, const char *type, const char *format) const {
183 f->open_object_section(section);
184 ::encode_json("type", "nested", f);
185 f->open_object_section("properties");
186 encode_json("name", es_dump_type("string"), f);
187 encode_json("value", es_dump_type(type, format), f);
188 f->close_section(); // entry
189 f->close_section(); // custom-string
190 }
191 void dump(Formatter *f) const {
192 f->open_object_section("object");
193 f->open_object_section("properties");
194 encode_json("bucket", es_dump_type("string"), f);
195 encode_json("name", es_dump_type("string"), f);
196 encode_json("instance", es_dump_type("string"), f);
197 encode_json("versioned_epoch", es_dump_type("long"), f);
198 f->open_object_section("meta");
199 f->open_object_section("properties");
200 encode_json("cache_control", es_dump_type("string"), f);
201 encode_json("content_disposition", es_dump_type("string"), f);
202 encode_json("content_encoding", es_dump_type("string"), f);
203 encode_json("content_language", es_dump_type("string"), f);
204 encode_json("content_type", es_dump_type("string"), f);
205 encode_json("etag", es_dump_type("string"), f);
206 encode_json("expires", es_dump_type("string"), f);
207 f->open_object_section("mtime");
208 ::encode_json("type", "date", f);
209 ::encode_json("format", "strict_date_optional_time||epoch_millis", f);
210 f->close_section(); // mtime
211 encode_json("size", es_dump_type("long"), f);
212 dump_custom(f, "custom-string", "string", nullptr);
213 dump_custom(f, "custom-int", "long", nullptr);
214 dump_custom(f, "custom-date", "date", "strict_date_optional_time||epoch_millis");
215 f->close_section(); // properties
216 f->close_section(); // meta
217 f->close_section(); // properties
218 f->close_section(); // object
219 }
220 };
221
222 struct es_index_settings {
223 uint32_t num_replicas;
224 uint32_t num_shards;
225
226 es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
227
228 void dump(Formatter *f) const {
229 encode_json("number_of_replicas", num_replicas, f);
230 encode_json("number_of_shards", num_shards, f);
231 }
232 };
233
234 struct es_index_config {
235 es_index_settings settings;
236 es_index_mappings mappings;
237
238 es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {}
239
240 void dump(Formatter *f) const {
241 encode_json("settings", settings, f);
242 encode_json("mappings", mappings, f);
243 }
244 };
245
246 static bool is_sys_attr(const std::string& attr_name){
247 static constexpr std::initializer_list<const char*> rgw_sys_attrs = {RGW_ATTR_PG_VER,
248 RGW_ATTR_SOURCE_ZONE,
249 RGW_ATTR_ID_TAG,
250 RGW_ATTR_TEMPURL_KEY1,
251 RGW_ATTR_TEMPURL_KEY2,
252 RGW_ATTR_UNIX1,
253 RGW_ATTR_UNIX_KEY1
254 };
255
256 return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end();
257 }
258
259 struct es_obj_metadata {
260 CephContext *cct;
261 ElasticConfigRef es_conf;
262 RGWBucketInfo bucket_info;
263 rgw_obj_key key;
264 ceph::real_time mtime;
265 uint64_t size;
266 map<string, bufferlist> attrs;
267 uint64_t versioned_epoch;
268
269 es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
270 const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
271 map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
272 mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
273
274 void dump(Formatter *f) const {
275 map<string, string> out_attrs;
276 map<string, string> custom_meta;
277 RGWAccessControlPolicy policy;
278 set<string> permissions;
279 RGWObjTags obj_tags;
280
281 for (auto i : attrs) {
282 const string& attr_name = i.first;
283 bufferlist& val = i.second;
284
285 if (attr_name.compare(0, sizeof(RGW_ATTR_PREFIX) - 1, RGW_ATTR_PREFIX) != 0) {
286 continue;
287 }
288
289 if (attr_name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
290 custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1),
291 string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0)));
292 continue;
293 }
294
295 if (attr_name.compare(0, sizeof(RGW_ATTR_CRYPT_PREFIX) -1, RGW_ATTR_CRYPT_PREFIX) == 0) {
296 continue;
297 }
298
299 if (attr_name == RGW_ATTR_ACL) {
300 try {
301 auto i = val.begin();
302 ::decode(policy, i);
303 } catch (buffer::error& err) {
304 ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
305 continue;
306 }
307
308 const RGWAccessControlList& acl = policy.get_acl();
309
310 permissions.insert(policy.get_owner().get_id().to_str());
311 for (auto acliter : acl.get_grant_map()) {
312 const ACLGrant& grant = acliter.second;
313 if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
314 ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
315 rgw_user user;
316 if (grant.get_id(user)) {
317 permissions.insert(user.to_str());
318 }
319 }
320 }
321 } else if (attr_name == RGW_ATTR_TAGS) {
322 try {
323 auto tags_bl = val.begin();
324 ::decode(obj_tags, tags_bl);
325 } catch (buffer::error& err) {
326 ldout(cct,0) << "ERROR: failed to decode obj tags for "
327 << bucket_info.bucket << "/" << key << dendl;
328 continue;
329 }
330 } else if (attr_name == RGW_ATTR_COMPRESSION) {
331 RGWCompressionInfo cs_info;
332 try {
333 auto vals_bl = val.begin();
334 ::decode(cs_info, vals_bl);
335 } catch (buffer::error& err) {
336 ldout(cct,0) << "ERROR: failed to decode compression attr for "
337 << bucket_info.bucket << "/" << key << dendl;
338 continue;
339 }
340 out_attrs.emplace("compression",std::move(cs_info.compression_type));
341 } else {
342 if (!is_sys_attr(attr_name)) {
343 out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1),
344 std::string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0)));
345 }
346 }
347 }
348 ::encode_json("bucket", bucket_info.bucket.name, f);
349 ::encode_json("name", key.name, f);
350 ::encode_json("instance", key.instance, f);
351 ::encode_json("versioned_epoch", versioned_epoch, f);
352 ::encode_json("owner", policy.get_owner(), f);
353 ::encode_json("permissions", permissions, f);
354 f->open_object_section("meta");
355 ::encode_json("size", size, f);
356
357 string mtime_str;
358 rgw_to_iso8601(mtime, &mtime_str);
359 ::encode_json("mtime", mtime_str, f);
360 for (auto i : out_attrs) {
361 ::encode_json(i.first.c_str(), i.second, f);
362 }
363 map<string, string> custom_str;
364 map<string, string> custom_int;
365 map<string, string> custom_date;
366
367 for (auto i : custom_meta) {
368 auto config = bucket_info.mdsearch_config.find(i.first);
369 if (config == bucket_info.mdsearch_config.end()) {
370 if (!es_conf->explicit_custom_meta) {
371 /* default custom meta is of type string */
372 custom_str[i.first] = i.second;
373 } else {
374 ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
375 }
376 continue;
377 }
378 switch (config->second) {
379 case ESEntityTypeMap::ES_ENTITY_DATE:
380 custom_date[i.first] = i.second;
381 break;
382 case ESEntityTypeMap::ES_ENTITY_INT:
383 custom_int[i.first] = i.second;
384 break;
385 default:
386 custom_str[i.first] = i.second;
387 }
388 }
389
390 if (!custom_str.empty()) {
391 f->open_array_section("custom-string");
392 for (auto i : custom_str) {
393 f->open_object_section("entity");
394 ::encode_json("name", i.first.c_str(), f);
395 ::encode_json("value", i.second, f);
396 f->close_section();
397 }
398 f->close_section();
399 }
400 if (!custom_int.empty()) {
401 f->open_array_section("custom-int");
402 for (auto i : custom_int) {
403 f->open_object_section("entity");
404 ::encode_json("name", i.first.c_str(), f);
405 ::encode_json("value", i.second, f);
406 f->close_section();
407 }
408 f->close_section();
409 }
410 if (!custom_date.empty()) {
411 f->open_array_section("custom-date");
412 for (auto i : custom_date) {
413 /*
414 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
415 * which will end up with failed sync
416 */
417 real_time t;
418 int r = parse_time(i.second.c_str(), &t);
419 if (r < 0) {
420 ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
421 continue;
422 }
423
424 string time_str;
425 rgw_to_iso8601(t, &time_str);
426
427 f->open_object_section("entity");
428 ::encode_json("name", i.first.c_str(), f);
429 ::encode_json("value", time_str.c_str(), f);
430 f->close_section();
431 }
432 f->close_section();
433 }
434 f->close_section(); // meta
435 const auto& m = obj_tags.get_tags();
436 if (m.size() > 0){
437 f->open_array_section("tagging");
438 for (const auto &it : m) {
439 f->open_object_section("tag");
440 ::encode_json("key", it.first, f);
441 ::encode_json("value",it.second, f);
442 f->close_section();
443 }
444 f->close_section(); // tagging
445 }
446 }
447 };
448
449 class RGWElasticInitConfigCBCR : public RGWCoroutine {
450 RGWDataSyncEnv *sync_env;
451 ElasticConfigRef conf;
452 public:
453 RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
454 ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
455 sync_env(_sync_env),
456 conf(_conf) {}
457 int operate() override {
458 reenter(this) {
459 ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
460 yield {
461 string path = conf->get_index_path();
462
463 es_index_settings settings(conf->num_replicas, conf->num_shards);
464 es_index_mappings mappings;
465
466 es_index_config index_conf(settings, mappings);
467
468 call(new RGWPutRESTResourceCR<es_index_config, int>(sync_env->cct, conf->conn.get(),
469 sync_env->http_manager,
470 path, nullptr /* params */,
471 index_conf, nullptr /* result */));
472 }
473 if (retcode < 0) {
474 return set_cr_error(retcode);
475 }
476 return set_cr_done();
477 }
478 return 0;
479 }
480
481 };
482
483 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
484 ElasticConfigRef conf;
485 uint64_t versioned_epoch;
486 public:
487 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
488 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
489 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
490 versioned_epoch(_versioned_epoch) {}
491 int operate() override {
492 reenter(this) {
493 ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
494 << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
495 << " attrs=" << attrs << dendl;
496 yield {
497 string path = conf->get_obj_path(bucket_info, key);
498 es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
499
500 call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
501 sync_env->http_manager,
502 path, nullptr /* params */,
503 doc, nullptr /* result */));
504
505 }
506 if (retcode < 0) {
507 return set_cr_error(retcode);
508 }
509 return set_cr_done();
510 }
511 return 0;
512 }
513 };
514
515 class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
516 ElasticConfigRef conf;
517 uint64_t versioned_epoch;
518 public:
519 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
520 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
521 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
522 conf(_conf), versioned_epoch(_versioned_epoch) {
523 }
524
525 ~RGWElasticHandleRemoteObjCR() override {}
526
527 RGWStatRemoteObjCBCR *allocate_callback() override {
528 return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
529 }
530 };
531
532 class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
533 RGWDataSyncEnv *sync_env;
534 RGWBucketInfo bucket_info;
535 rgw_obj_key key;
536 ceph::real_time mtime;
537 ElasticConfigRef conf;
538 public:
539 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
540 RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
541 ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
542 bucket_info(_bucket_info), key(_key),
543 mtime(_mtime), conf(_conf) {}
544 int operate() override {
545 reenter(this) {
546 ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
547 << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
548 yield {
549 string path = conf->get_obj_path(bucket_info, key);
550
551 call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
552 sync_env->http_manager,
553 path, nullptr /* params */));
554 }
555 if (retcode < 0) {
556 return set_cr_error(retcode);
557 }
558 return set_cr_done();
559 }
560 return 0;
561 }
562
563 };
564
565 class RGWElasticDataSyncModule : public RGWDataSyncModule {
566 ElasticConfigRef conf;
567 public:
568 RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) {
569 conf->init(cct, config);
570 }
571 ~RGWElasticDataSyncModule() override {}
572
573 void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
574 conf->init_instance(sync_env->store->get_realm(), instance_id);
575 }
576
577 RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
578 ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
579 return new RGWElasticInitConfigCBCR(sync_env, conf);
580 }
581 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
582 ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
583 if (!conf->should_handle_operation(bucket_info)) {
584 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
585 return nullptr;
586 }
587 return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
588 }
589 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
590 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
591 ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
592 if (!conf->should_handle_operation(bucket_info)) {
593 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
594 return nullptr;
595 }
596 return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
597 }
598 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
599 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
600 ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
601 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
602 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
603 return NULL;
604 }
605 RGWRESTConn *get_rest_conn() {
606 return conf->conn.get();
607 }
608
609 string get_index_path() {
610 return conf->get_index_path();
611 }
612 };
613
614 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
615 {
616 data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
617 }
618
619 RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
620 {
621 return data_handler.get();
622 }
623
624 RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
625 {
626 return data_handler->get_rest_conn();
627 }
628
629 string RGWElasticSyncModuleInstance::get_index_path() {
630 return data_handler->get_index_path();
631 }
632
633 RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
634 if (dialect != RGW_REST_S3) {
635 return orig;
636 }
637 delete orig;
638 return new RGWRESTMgr_MDSearch_S3();
639 }
640
641 int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
642 string endpoint;
643 auto i = config.find("endpoint");
644 if (i != config.end()) {
645 endpoint = i->second;
646 }
647 instance->reset(new RGWElasticSyncModuleInstance(cct, config));
648 return 0;
649 }
650