]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_es.cc
import ceph 12.2.12
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_b64.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_es.h"
10 #include "rgw_sync_module_es_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rest.h"
13 #include "rgw_op.h"
14 #include "rgw_es_query.h"
15
16 #include "include/str_list.h"
17
18 #include <boost/asio/yield.hpp>
19
20 #define dout_subsys ceph_subsys_rgw
21
22
23 /*
24 * whitelist utility. Config string is a list of entries, where an entry is either an item,
25 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
26 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
27 * with an asterisk. For example:
28 *
29 * bucket1, bucket2, foo*, *bar
30 */
31 class ItemList {
32 bool approve_all{false};
33
34 set<string> entries;
35 set<string> prefixes;
36 set<string> suffixes;
37
38 void parse(const string& str) {
39 list<string> l;
40
41 get_str_list(str, ",", l);
42
43 for (auto& entry : l) {
44 entry = rgw_trim_whitespace(entry);
45 if (entry.empty()) {
46 continue;
47 }
48
49 if (entry == "*") {
50 approve_all = true;
51 return;
52 }
53
54 if (entry[0] == '*') {
55 suffixes.insert(entry.substr(1));
56 continue;
57 }
58
59 if (entry.back() == '*') {
60 prefixes.insert(entry.substr(0, entry.size() - 1));
61 continue;
62 }
63
64 entries.insert(entry);
65 }
66 }
67
68 public:
69 ItemList() {}
70 void init(const string& str, bool def_val) {
71 if (str.empty()) {
72 approve_all = def_val;
73 } else {
74 parse(str);
75 }
76 }
77
78 bool exists(const string& entry) {
79 if (approve_all) {
80 return true;
81 }
82
83 if (entries.find(entry) != entries.end()) {
84 return true;
85 }
86
87 auto i = prefixes.upper_bound(entry);
88 if (i != prefixes.begin()) {
89 --i;
90 if (boost::algorithm::starts_with(entry, *i)) {
91 return true;
92 }
93 }
94
95 for (i = suffixes.begin(); i != suffixes.end(); ++i) {
96 if (boost::algorithm::ends_with(entry, *i)) {
97 return true;
98 }
99 }
100
101 return false;
102 }
103 };
104
105 #define ES_NUM_SHARDS_MIN 5
106
107 #define ES_NUM_SHARDS_DEFAULT 16
108 #define ES_NUM_REPLICAS_DEFAULT 1
109
110 using ESVersion = std::pair<int,int>;
111 static constexpr ESVersion ES_V5{5,0};
112
113 struct ESInfo {
114 std::string name;
115 std::string cluster_name;
116 std::string cluster_uuid;
117 ESVersion version;
118
119 void decode_json(JSONObj *obj);
120
121 std::string get_version_str(){
122 return std::to_string(version.first) + "." + std::to_string(version.second);
123 }
124 };
125
126 // simple wrapper structure to wrap the es version nested type
127 struct es_version_decoder {
128 ESVersion version;
129
130 int parse_version(const std::string& s) {
131 int major, minor;
132 int ret = sscanf(s.c_str(), "%d.%d", &major, &minor);
133 if (ret < 0) {
134 return ret;
135 }
136 version = std::make_pair(major,minor);
137 return 0;
138 }
139
140 void decode_json(JSONObj *obj) {
141 std::string s;
142 JSONDecoder::decode_json("number",s,obj);
143 if (parse_version(s) < 0)
144 throw JSONDecoder::err("Failed to parse ElasticVersion");
145 }
146 };
147
148
149 void ESInfo::decode_json(JSONObj *obj)
150 {
151 JSONDecoder::decode_json("name", name, obj);
152 JSONDecoder::decode_json("cluster_name", cluster_name, obj);
153 JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj);
154 es_version_decoder esv;
155 JSONDecoder::decode_json("version", esv, obj);
156 version = std::move(esv.version);
157 }
158
159 struct ElasticConfig {
160 uint64_t sync_instance{0};
161 string id;
162 string index_path;
163 std::unique_ptr<RGWRESTConn> conn;
164 bool explicit_custom_meta{true};
165 string override_index_path;
166 ItemList index_buckets;
167 ItemList allow_owners;
168 uint32_t num_shards{0};
169 uint32_t num_replicas{0};
170 std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
171
172 void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) {
173 string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
174 id = string("elastic:") + elastic_endpoint;
175 conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
176 explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
177 index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */
178 allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */
179 override_index_path = rgw_conf_get(config, "override_index_path", "");
180 num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT);
181 if (num_shards < ES_NUM_SHARDS_MIN) {
182 num_shards = ES_NUM_SHARDS_MIN;
183 }
184 num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT);
185 string user = rgw_conf_get(config, "username", "");
186 string pw = rgw_conf_get(config, "password", "");
187 if (!user.empty() && !pw.empty()) {
188 auto auth_string = user + ":" + pw;
189 default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string));
190 }
191 }
192
193 void init_instance(RGWRealm& realm, uint64_t instance_id) {
194 sync_instance = instance_id;
195
196 if (!override_index_path.empty()) {
197 index_path = override_index_path;
198 return;
199 }
200
201 char buf[32];
202 snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
203
204 index_path = "/rgw-" + realm.get_name() + buf;
205 }
206
207 string get_index_path() {
208 return index_path;
209 }
210
211 map<string, string>& get_request_headers() {
212 return default_headers;
213 }
214
215 string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
216 return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
217 }
218
219 bool should_handle_operation(RGWBucketInfo& bucket_info) {
220 return index_buckets.exists(bucket_info.bucket.name) &&
221 allow_owners.exists(bucket_info.owner.to_str());
222 }
223 };
224
225 using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
226
227 static const char *es_type_to_str(const ESType& t) {
228 switch (t) {
229 case ESType::String: return "string";
230 case ESType::Text: return "text";
231 case ESType::Keyword: return "keyword";
232 case ESType::Long: return "long";
233 case ESType::Integer: return "integer";
234 case ESType::Short: return "short";
235 case ESType::Byte: return "byte";
236 case ESType::Double: return "double";
237 case ESType::Float: return "float";
238 case ESType::Half_Float: return "half_float";
239 case ESType::Scaled_Float: return "scaled_float";
240 case ESType::Date: return "date";
241 case ESType::Boolean: return "boolean";
242 case ESType::Integer_Range: return "integer_range";
243 case ESType::Float_Range: return "float_range";
244 case ESType::Double_Range: return "date_range";
245 case ESType::Date_Range: return "date_range";
246 case ESType::Geo_Point: return "geo_point";
247 case ESType::Ip: return "ip";
248 default:
249 return "<unknown>";
250 }
251 }
252
253 struct es_type_v2 {
254 ESType estype;
255 const char *format{nullptr};
256 boost::optional<bool> analyzed;
257
258 es_type_v2(ESType et) : estype(et) {}
259
260 void dump(Formatter *f) const {
261 const char *type_str = es_type_to_str(estype);
262 encode_json("type", type_str, f);
263 if (format) {
264 encode_json("format", format, f);
265 }
266
267 auto is_analyzed = analyzed;
268
269 if (estype == ESType::String &&
270 !is_analyzed) {
271 is_analyzed = false;
272 }
273
274 if (is_analyzed) {
275 encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f);
276 }
277 }
278 };
279
280 struct es_type_v5 {
281 ESType estype;
282 const char *format{nullptr};
283 boost::optional<bool> analyzed;
284 boost::optional<bool> index;
285
286 es_type_v5(ESType et) : estype(et) {}
287
288 void dump(Formatter *f) const {
289 ESType new_estype;
290 if (estype != ESType::String) {
291 new_estype = estype;
292 } else {
293 bool is_analyzed = analyzed.value_or(false);
294 new_estype = (is_analyzed ? ESType::Text : ESType::Keyword);
295 /* index = true; ... Not setting index=true, because that's the default,
296 * and dumping a boolean value *might* be a problem when backporting this
297 * because value might get quoted
298 */
299 }
300
301 const char *type_str = es_type_to_str(new_estype);
302 encode_json("type", type_str, f);
303 if (format) {
304 encode_json("format", format, f);
305 }
306 if (index) {
307 encode_json("index", index.value(), f);
308 }
309 }
310 };
311
312 template <class T>
313 struct es_type : public T {
314 es_type(T t) : T(t) {}
315 es_type& set_format(const char *f) {
316 T::format = f;
317 return *this;
318 }
319
320 es_type& set_analyzed(bool a) {
321 T::analyzed = a;
322 return *this;
323 }
324 };
325
326 template <class T>
327 struct es_index_mappings {
328 ESType string_type {ESType::String};
329
330 es_type<T> est(ESType t) const {
331 return es_type<T>(t);
332 }
333
334 void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const {
335 f->open_object_section(section);
336 ::encode_json("type", "nested", f);
337 f->open_object_section("properties");
338 encode_json("name", est(string_type), f);
339 encode_json("value", est(type).set_format(format), f);
340 f->close_section(); // entry
341 f->close_section(); // custom-string
342 }
343
344 void dump(Formatter *f) const {
345 f->open_object_section("object");
346 f->open_object_section("properties");
347 encode_json("bucket", est(string_type), f);
348 encode_json("name", est(string_type), f);
349 encode_json("instance", est(string_type), f);
350 encode_json("versioned_epoch", est(ESType::Long), f);
351 f->open_object_section("meta");
352 f->open_object_section("properties");
353 encode_json("cache_control", est(string_type), f);
354 encode_json("content_disposition", est(string_type), f);
355 encode_json("content_encoding", est(string_type), f);
356 encode_json("content_language", est(string_type), f);
357 encode_json("content_type", est(string_type), f);
358 encode_json("storage_class", est(string_type), f);
359 encode_json("etag", est(string_type), f);
360 encode_json("expires", est(string_type), f);
361 encode_json("mtime", est(ESType::Date)
362 .set_format("strict_date_optional_time||epoch_millis"), f);
363 encode_json("size", est(ESType::Long), f);
364 dump_custom("custom-string", string_type, nullptr, f);
365 dump_custom("custom-int", ESType::Long, nullptr, f);
366 dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f);
367 f->close_section(); // properties
368 f->close_section(); // meta
369 f->close_section(); // properties
370 f->close_section(); // object
371 }
372 };
373
374 struct es_index_settings {
375 uint32_t num_replicas;
376 uint32_t num_shards;
377
378 es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
379
380 void dump(Formatter *f) const {
381 encode_json("number_of_replicas", num_replicas, f);
382 encode_json("number_of_shards", num_shards, f);
383 }
384 };
385
386 struct es_index_config_base {
387 virtual ~es_index_config_base() {}
388 virtual void dump(Formatter *f) const = 0;
389 };
390
391 template <class T>
392 struct es_index_config : public es_index_config_base {
393 es_index_settings settings;
394 es_index_mappings<T> mappings;
395
396 es_index_config(es_index_settings& _s) : settings(_s) {}
397
398 void dump(Formatter *f) const {
399 encode_json("settings", settings, f);
400 encode_json("mappings", mappings, f);
401 }
402 };
403
404 static bool is_sys_attr(const std::string& attr_name){
405 static constexpr std::initializer_list<const char*> rgw_sys_attrs = {RGW_ATTR_PG_VER,
406 RGW_ATTR_SOURCE_ZONE,
407 RGW_ATTR_ID_TAG,
408 RGW_ATTR_TEMPURL_KEY1,
409 RGW_ATTR_TEMPURL_KEY2,
410 RGW_ATTR_UNIX1,
411 RGW_ATTR_UNIX_KEY1
412 };
413
414 return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end();
415 }
416
417 static size_t attr_len(const bufferlist& val)
418 {
419 size_t len = val.length();
420 if (len && val[len - 1] == '\0') {
421 --len;
422 }
423
424 return len;
425 }
426
427 struct es_obj_metadata {
428 CephContext *cct;
429 ElasticConfigRef es_conf;
430 RGWBucketInfo bucket_info;
431 rgw_obj_key key;
432 ceph::real_time mtime;
433 uint64_t size;
434 map<string, bufferlist> attrs;
435 uint64_t versioned_epoch;
436
437 es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
438 const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
439 map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
440 mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
441
442 void dump(Formatter *f) const {
443 map<string, string> out_attrs;
444 map<string, string> custom_meta;
445 RGWAccessControlPolicy policy;
446 set<string> permissions;
447 RGWObjTags obj_tags;
448
449 for (auto i : attrs) {
450 const string& attr_name = i.first;
451 bufferlist& val = i.second;
452
453 if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) {
454 continue;
455 }
456
457 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) {
458 custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1),
459 string(val.c_str(), attr_len(val)));
460 continue;
461 }
462
463 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) {
464 continue;
465 }
466
467 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) {
468 // skip versioned object olh info
469 continue;
470 }
471
472 if (attr_name == RGW_ATTR_ACL) {
473 try {
474 auto i = val.begin();
475 ::decode(policy, i);
476 } catch (buffer::error& err) {
477 ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
478 continue;
479 }
480
481 const RGWAccessControlList& acl = policy.get_acl();
482
483 permissions.insert(policy.get_owner().get_id().to_str());
484 for (auto acliter : acl.get_grant_map()) {
485 const ACLGrant& grant = acliter.second;
486 if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
487 ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
488 rgw_user user;
489 if (grant.get_id(user)) {
490 permissions.insert(user.to_str());
491 }
492 }
493 }
494 } else if (attr_name == RGW_ATTR_TAGS) {
495 try {
496 auto tags_bl = val.begin();
497 ::decode(obj_tags, tags_bl);
498 } catch (buffer::error& err) {
499 ldout(cct,0) << "ERROR: failed to decode obj tags for "
500 << bucket_info.bucket << "/" << key << dendl;
501 continue;
502 }
503 } else if (attr_name == RGW_ATTR_COMPRESSION) {
504 RGWCompressionInfo cs_info;
505 try {
506 auto vals_bl = val.begin();
507 ::decode(cs_info, vals_bl);
508 } catch (buffer::error& err) {
509 ldout(cct,0) << "ERROR: failed to decode compression attr for "
510 << bucket_info.bucket << "/" << key << dendl;
511 continue;
512 }
513 out_attrs.emplace("compression",std::move(cs_info.compression_type));
514 } else {
515 if (!is_sys_attr(attr_name)) {
516 out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1),
517 std::string(val.c_str(), attr_len(val)));
518 }
519 }
520 }
521 ::encode_json("bucket", bucket_info.bucket.name, f);
522 ::encode_json("name", key.name, f);
523 string instance = key.instance;
524 if (instance.empty())
525 instance = "null";
526 ::encode_json("instance", instance, f);
527 ::encode_json("versioned_epoch", versioned_epoch, f);
528 ::encode_json("owner", policy.get_owner(), f);
529 ::encode_json("permissions", permissions, f);
530 f->open_object_section("meta");
531 ::encode_json("size", size, f);
532
533 string mtime_str;
534 rgw_to_iso8601(mtime, &mtime_str);
535 ::encode_json("mtime", mtime_str, f);
536 for (auto i : out_attrs) {
537 ::encode_json(i.first.c_str(), i.second, f);
538 }
539 map<string, string> custom_str;
540 map<string, string> custom_int;
541 map<string, string> custom_date;
542
543 for (auto i : custom_meta) {
544 auto config = bucket_info.mdsearch_config.find(i.first);
545 if (config == bucket_info.mdsearch_config.end()) {
546 if (!es_conf->explicit_custom_meta) {
547 /* default custom meta is of type string */
548 custom_str[i.first] = i.second;
549 } else {
550 ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
551 }
552 continue;
553 }
554 switch (config->second) {
555 case ESEntityTypeMap::ES_ENTITY_DATE:
556 custom_date[i.first] = i.second;
557 break;
558 case ESEntityTypeMap::ES_ENTITY_INT:
559 custom_int[i.first] = i.second;
560 break;
561 default:
562 custom_str[i.first] = i.second;
563 }
564 }
565
566 if (!custom_str.empty()) {
567 f->open_array_section("custom-string");
568 for (auto i : custom_str) {
569 f->open_object_section("entity");
570 ::encode_json("name", i.first.c_str(), f);
571 ::encode_json("value", i.second, f);
572 f->close_section();
573 }
574 f->close_section();
575 }
576 if (!custom_int.empty()) {
577 f->open_array_section("custom-int");
578 for (auto i : custom_int) {
579 f->open_object_section("entity");
580 ::encode_json("name", i.first.c_str(), f);
581 ::encode_json("value", i.second, f);
582 f->close_section();
583 }
584 f->close_section();
585 }
586 if (!custom_date.empty()) {
587 f->open_array_section("custom-date");
588 for (auto i : custom_date) {
589 /*
590 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
591 * which will end up with failed sync
592 */
593 real_time t;
594 int r = parse_time(i.second.c_str(), &t);
595 if (r < 0) {
596 ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
597 continue;
598 }
599
600 string time_str;
601 rgw_to_iso8601(t, &time_str);
602
603 f->open_object_section("entity");
604 ::encode_json("name", i.first.c_str(), f);
605 ::encode_json("value", time_str.c_str(), f);
606 f->close_section();
607 }
608 f->close_section();
609 }
610 f->close_section(); // meta
611 const auto& m = obj_tags.get_tags();
612 if (m.size() > 0){
613 f->open_array_section("tagging");
614 for (const auto &it : m) {
615 f->open_object_section("tag");
616 ::encode_json("key", it.first, f);
617 ::encode_json("value",it.second, f);
618 f->close_section();
619 }
620 f->close_section(); // tagging
621 }
622 }
623 };
624
625 class RGWElasticInitConfigCBCR : public RGWCoroutine {
626 RGWDataSyncEnv *sync_env;
627 ElasticConfigRef conf;
628 ESInfo es_info;
629
630 struct _err_response {
631 struct err_reason {
632 vector<err_reason> root_cause;
633 string type;
634 string reason;
635 string index;
636
637 void decode_json(JSONObj *obj) {
638 JSONDecoder::decode_json("root_cause", root_cause, obj);
639 JSONDecoder::decode_json("type", type, obj);
640 JSONDecoder::decode_json("reason", reason, obj);
641 JSONDecoder::decode_json("index", index, obj);
642 }
643 } error;
644
645 void decode_json(JSONObj *obj) {
646 JSONDecoder::decode_json("error", error, obj);
647 }
648 } err_response;
649
650 public:
651 RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
652 ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
653 sync_env(_sync_env),
654 conf(_conf) {}
655 int operate() override {
656 reenter(this) {
657 ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
658 yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
659 conf->conn.get(),
660 sync_env->http_manager,
661 "/", nullptr /*params*/,
662 &(conf->default_headers),
663 &es_info));
664 if (retcode < 0) {
665 return set_cr_error(retcode);
666 }
667
668 yield {
669 string path = conf->get_index_path();
670 ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;
671
672 es_index_settings settings(conf->num_replicas, conf->num_shards);
673
674 std::unique_ptr<es_index_config_base> index_conf;
675
676 if (es_info.version >= ES_V5) {
677 ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
678 index_conf.reset(new es_index_config<es_type_v5>(settings));
679 } else {
680 ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
681 index_conf.reset(new es_index_config<es_type_v2>(settings));
682 }
683 call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
684 conf->conn.get(),
685 sync_env->http_manager,
686 path, nullptr /*params*/,
687 &(conf->default_headers),
688 *index_conf, nullptr, &err_response));
689 }
690 if (retcode < 0) {
691 ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
692
693 if (err_response.error.type != "index_already_exists_exception") {
694 return set_cr_error(retcode);
695 }
696
697 ldout(sync_env->cct, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl;
698 }
699 return set_cr_done();
700 }
701 return 0;
702 }
703
704 };
705
706 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
707 ElasticConfigRef conf;
708 uint64_t versioned_epoch;
709 public:
710 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
711 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
712 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
713 versioned_epoch(_versioned_epoch) {}
714 int operate() override {
715 reenter(this) {
716 ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
717 << " b=" << bucket_info.bucket << " k=" << key
718 << " size=" << size << " mtime=" << mtime << dendl;
719
720 yield {
721 string path = conf->get_obj_path(bucket_info, key);
722 es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
723
724 call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
725 sync_env->http_manager,
726 path, nullptr /* params */,
727 &(conf->default_headers),
728 doc, nullptr /* result */));
729
730 }
731 if (retcode < 0) {
732 return set_cr_error(retcode);
733 }
734 return set_cr_done();
735 }
736 return 0;
737 }
738 };
739
740 class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
741 ElasticConfigRef conf;
742 uint64_t versioned_epoch;
743 public:
744 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
745 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
746 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
747 conf(_conf), versioned_epoch(_versioned_epoch) {
748 }
749
750 ~RGWElasticHandleRemoteObjCR() override {}
751
752 RGWStatRemoteObjCBCR *allocate_callback() override {
753 return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
754 }
755 };
756
757 class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
758 RGWDataSyncEnv *sync_env;
759 RGWBucketInfo bucket_info;
760 rgw_obj_key key;
761 ceph::real_time mtime;
762 ElasticConfigRef conf;
763 public:
764 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
765 RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
766 ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
767 bucket_info(_bucket_info), key(_key),
768 mtime(_mtime), conf(_conf) {}
769 int operate() override {
770 reenter(this) {
771 ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
772 << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
773 yield {
774 string path = conf->get_obj_path(bucket_info, key);
775
776 call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
777 sync_env->http_manager,
778 path, nullptr /* params */));
779 }
780 if (retcode < 0) {
781 return set_cr_error(retcode);
782 }
783 return set_cr_done();
784 }
785 return 0;
786 }
787
788 };
789
790 class RGWElasticDataSyncModule : public RGWDataSyncModule {
791 ElasticConfigRef conf;
792 public:
793 RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) {
794 conf->init(cct, config);
795 }
796 ~RGWElasticDataSyncModule() override {}
797
798 void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
799 conf->init_instance(sync_env->store->get_realm(), instance_id);
800 }
801
802 RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
803 ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
804 return new RGWElasticInitConfigCBCR(sync_env, conf);
805 }
806 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
807 ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
808 if (!conf->should_handle_operation(bucket_info)) {
809 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
810 return nullptr;
811 }
812 return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
813 }
814 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
815 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
816 ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
817 if (!conf->should_handle_operation(bucket_info)) {
818 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
819 return nullptr;
820 }
821 return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
822 }
823 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
824 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
825 ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
826 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
827 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
828 return NULL;
829 }
830 RGWRESTConn *get_rest_conn() {
831 return conf->conn.get();
832 }
833
834 string get_index_path() {
835 return conf->get_index_path();
836 }
837
838 map<string, string>& get_request_headers() {
839 return conf->get_request_headers();
840 }
841 };
842
843 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
844 {
845 data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
846 }
847
848 RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
849 {
850 return data_handler.get();
851 }
852
853 RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
854 {
855 return data_handler->get_rest_conn();
856 }
857
858 string RGWElasticSyncModuleInstance::get_index_path() {
859 return data_handler->get_index_path();
860 }
861
862 map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() {
863 return data_handler->get_request_headers();
864 }
865
866 RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
867 if (dialect != RGW_REST_S3) {
868 return orig;
869 }
870 delete orig;
871 return new RGWRESTMgr_MDSearch_S3();
872 }
873
874 int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
875 string endpoint;
876 auto i = config.find("endpoint");
877 if (i != config.end()) {
878 endpoint = i->second;
879 }
880 instance->reset(new RGWElasticSyncModuleInstance(cct, config));
881 return 0;
882 }
883