]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_es.cc
d1953193f95622bb49d6b732dc2e1e31c19addec
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_b64.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_es.h"
10 #include "rgw_sync_module_es_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rest.h"
13 #include "rgw_op.h"
14 #include "rgw_es_query.h"
15 #include "rgw_zone.h"
16
17 #include "services/svc_zone.h"
18
19 #include "include/str_list.h"
20
21 #include <boost/asio/yield.hpp>
22
23 #define dout_subsys ceph_subsys_rgw
24
25
26 /*
27 * allowlist utility. Config string is a list of entries, where an entry is either an item,
28 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
29 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
30 * with an asterisk. For example:
31 *
32 * bucket1, bucket2, foo*, *bar
33 */
34 class ItemList {
35 bool approve_all{false};
36
37 set<string> entries;
38 set<string> prefixes;
39 set<string> suffixes;
40
41 void parse(const string& str) {
42 list<string> l;
43
44 get_str_list(str, ",", l);
45
46 for (auto& entry : l) {
47 entry = rgw_trim_whitespace(entry);
48 if (entry.empty()) {
49 continue;
50 }
51
52 if (entry == "*") {
53 approve_all = true;
54 return;
55 }
56
57 if (entry[0] == '*') {
58 suffixes.insert(entry.substr(1));
59 continue;
60 }
61
62 if (entry.back() == '*') {
63 prefixes.insert(entry.substr(0, entry.size() - 1));
64 continue;
65 }
66
67 entries.insert(entry);
68 }
69 }
70
71 public:
72 ItemList() {}
73 void init(const string& str, bool def_val) {
74 if (str.empty()) {
75 approve_all = def_val;
76 } else {
77 parse(str);
78 }
79 }
80
81 bool exists(const string& entry) {
82 if (approve_all) {
83 return true;
84 }
85
86 if (entries.find(entry) != entries.end()) {
87 return true;
88 }
89
90 auto i = prefixes.upper_bound(entry);
91 if (i != prefixes.begin()) {
92 --i;
93 if (boost::algorithm::starts_with(entry, *i)) {
94 return true;
95 }
96 }
97
98 for (i = suffixes.begin(); i != suffixes.end(); ++i) {
99 if (boost::algorithm::ends_with(entry, *i)) {
100 return true;
101 }
102 }
103
104 return false;
105 }
106 };
107
108 #define ES_NUM_SHARDS_MIN 5
109
110 #define ES_NUM_SHARDS_DEFAULT 16
111 #define ES_NUM_REPLICAS_DEFAULT 1
112
113 using ESVersion = std::pair<int,int>;
114 static constexpr ESVersion ES_V5{5,0};
115 static constexpr ESVersion ES_V7{7,0};
116
117 struct ESInfo {
118 std::string name;
119 std::string cluster_name;
120 std::string cluster_uuid;
121 ESVersion version;
122
123 void decode_json(JSONObj *obj);
124
125 std::string get_version_str(){
126 return std::to_string(version.first) + "." + std::to_string(version.second);
127 }
128 };
129
130 // simple wrapper structure to wrap the es version nested type
131 struct es_version_decoder {
132 ESVersion version;
133
134 int parse_version(const std::string& s) {
135 int major, minor;
136 int ret = sscanf(s.c_str(), "%d.%d", &major, &minor);
137 if (ret < 0) {
138 return ret;
139 }
140 version = std::make_pair(major,minor);
141 return 0;
142 }
143
144 void decode_json(JSONObj *obj) {
145 std::string s;
146 JSONDecoder::decode_json("number",s,obj);
147 if (parse_version(s) < 0)
148 throw JSONDecoder::err("Failed to parse ElasticVersion");
149 }
150 };
151
152
153 void ESInfo::decode_json(JSONObj *obj)
154 {
155 JSONDecoder::decode_json("name", name, obj);
156 JSONDecoder::decode_json("cluster_name", cluster_name, obj);
157 JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj);
158 es_version_decoder esv;
159 JSONDecoder::decode_json("version", esv, obj);
160 version = std::move(esv.version);
161 }
162
163 struct ElasticConfig {
164 uint64_t sync_instance{0};
165 string id;
166 string index_path;
167 std::unique_ptr<RGWRESTConn> conn;
168 bool explicit_custom_meta{true};
169 string override_index_path;
170 ItemList index_buckets;
171 ItemList allow_owners;
172 uint32_t num_shards{0};
173 uint32_t num_replicas{0};
174 std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
175 ESInfo es_info;
176
177 void init(CephContext *cct, const JSONFormattable& config) {
178 string elastic_endpoint = config["endpoint"];
179 id = string("elastic:") + elastic_endpoint;
180 conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
181 explicit_custom_meta = config["explicit_custom_meta"](true);
182 index_buckets.init(config["index_buckets_list"], true); /* approve all buckets by default */
183 allow_owners.init(config["approved_owners_list"], true); /* approve all bucket owners by default */
184 override_index_path = config["override_index_path"];
185 num_shards = config["num_shards"](ES_NUM_SHARDS_DEFAULT);
186 if (num_shards < ES_NUM_SHARDS_MIN) {
187 num_shards = ES_NUM_SHARDS_MIN;
188 }
189 num_replicas = config["num_replicas"](ES_NUM_REPLICAS_DEFAULT);
190 if (string user = config["username"], pw = config["password"];
191 !user.empty() && !pw.empty()) {
192 auto auth_string = user + ":" + pw;
193 default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string));
194 }
195
196 }
197
198 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
199 sync_instance = instance_id;
200
201 if (!override_index_path.empty()) {
202 index_path = override_index_path;
203 return;
204 }
205
206 char buf[32];
207 snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
208
209 index_path = "/rgw-" + realm.get_name() + buf;
210 }
211
212 string get_index_path() {
213 return index_path;
214 }
215
216 map<string, string>& get_request_headers() {
217 return default_headers;
218 }
219
220 string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
221 if (es_info.version >= ES_V7) {
222 return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
223 ;
224 } else {
225 return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
226 }
227 }
228
229 bool should_handle_operation(RGWBucketInfo& bucket_info) {
230 return index_buckets.exists(bucket_info.bucket.name) &&
231 allow_owners.exists(bucket_info.owner.to_str());
232 }
233 };
234
235 using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
236
237 static const char *es_type_to_str(const ESType& t) {
238 switch (t) {
239 case ESType::String: return "string";
240 case ESType::Text: return "text";
241 case ESType::Keyword: return "keyword";
242 case ESType::Long: return "long";
243 case ESType::Integer: return "integer";
244 case ESType::Short: return "short";
245 case ESType::Byte: return "byte";
246 case ESType::Double: return "double";
247 case ESType::Float: return "float";
248 case ESType::Half_Float: return "half_float";
249 case ESType::Scaled_Float: return "scaled_float";
250 case ESType::Date: return "date";
251 case ESType::Boolean: return "boolean";
252 case ESType::Integer_Range: return "integer_range";
253 case ESType::Float_Range: return "float_range";
254 case ESType::Double_Range: return "date_range";
255 case ESType::Date_Range: return "date_range";
256 case ESType::Geo_Point: return "geo_point";
257 case ESType::Ip: return "ip";
258 default:
259 return "<unknown>";
260 }
261 }
262
263 struct es_type_v2 {
264 ESType estype;
265 const char *format{nullptr};
266 std::optional<bool> analyzed;
267
268 es_type_v2(ESType et) : estype(et) {}
269
270 void dump(Formatter *f) const {
271 const char *type_str = es_type_to_str(estype);
272 encode_json("type", type_str, f);
273 if (format) {
274 encode_json("format", format, f);
275 }
276
277 auto is_analyzed = analyzed;
278
279 if (estype == ESType::String &&
280 !is_analyzed) {
281 is_analyzed = false;
282 }
283
284 if (is_analyzed) {
285 encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f);
286 }
287 }
288 };
289
290 struct es_type_v5 {
291 ESType estype;
292 const char *format{nullptr};
293 std::optional<bool> analyzed;
294 std::optional<bool> index;
295
296 es_type_v5(ESType et) : estype(et) {}
297
298 void dump(Formatter *f) const {
299 ESType new_estype;
300 if (estype != ESType::String) {
301 new_estype = estype;
302 } else {
303 bool is_analyzed = analyzed.value_or(false);
304 new_estype = (is_analyzed ? ESType::Text : ESType::Keyword);
305 /* index = true; ... Not setting index=true, because that's the default,
306 * and dumping a boolean value *might* be a problem when backporting this
307 * because value might get quoted
308 */
309 }
310
311 const char *type_str = es_type_to_str(new_estype);
312 encode_json("type", type_str, f);
313 if (format) {
314 encode_json("format", format, f);
315 }
316 if (index) {
317 encode_json("index", index.value(), f);
318 }
319 }
320 };
321
322 template <class T>
323 struct es_type : public T {
324 es_type(T t) : T(t) {}
325 es_type& set_format(const char *f) {
326 T::format = f;
327 return *this;
328 }
329
330 es_type& set_analyzed(bool a) {
331 T::analyzed = a;
332 return *this;
333 }
334 };
335
336 template <class T>
337 struct es_index_mappings {
338 ESVersion es_version;
339 ESType string_type {ESType::String};
340
341 es_index_mappings(ESVersion esv):es_version(esv) {
342 }
343
344 es_type<T> est(ESType t) const {
345 return es_type<T>(t);
346 }
347
348 void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const {
349 f->open_object_section(section);
350 ::encode_json("type", "nested", f);
351 f->open_object_section("properties");
352 encode_json("name", est(string_type), f);
353 encode_json("value", est(type).set_format(format), f);
354 f->close_section(); // entry
355 f->close_section(); // custom-string
356 }
357
358 void dump(Formatter *f) const {
359 if (es_version <= ES_V7)
360 f->open_object_section("object");
361 f->open_object_section("properties");
362 encode_json("bucket", est(string_type), f);
363 encode_json("name", est(string_type), f);
364 encode_json("instance", est(string_type), f);
365 encode_json("versioned_epoch", est(ESType::Long), f);
366 f->open_object_section("meta");
367 f->open_object_section("properties");
368 encode_json("cache_control", est(string_type), f);
369 encode_json("content_disposition", est(string_type), f);
370 encode_json("content_encoding", est(string_type), f);
371 encode_json("content_language", est(string_type), f);
372 encode_json("content_type", est(string_type), f);
373 encode_json("storage_class", est(string_type), f);
374 encode_json("etag", est(string_type), f);
375 encode_json("expires", est(string_type), f);
376 encode_json("mtime", est(ESType::Date)
377 .set_format("strict_date_optional_time||epoch_millis"), f);
378 encode_json("size", est(ESType::Long), f);
379 dump_custom("custom-string", string_type, nullptr, f);
380 dump_custom("custom-int", ESType::Long, nullptr, f);
381 dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f);
382 f->close_section(); // properties
383 f->close_section(); // meta
384 f->close_section(); // properties
385
386 if (es_version <= ES_V7)
387 f->close_section(); // object
388 }
389 };
390
391 struct es_index_settings {
392 uint32_t num_replicas;
393 uint32_t num_shards;
394
395 es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
396
397 void dump(Formatter *f) const {
398 encode_json("number_of_replicas", num_replicas, f);
399 encode_json("number_of_shards", num_shards, f);
400 }
401 };
402
403 struct es_index_config_base {
404 virtual ~es_index_config_base() {}
405 virtual void dump(Formatter *f) const = 0;
406 };
407
408 template <class T>
409 struct es_index_config : public es_index_config_base {
410 es_index_settings settings;
411 es_index_mappings<T> mappings;
412
413 es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
414 }
415
416 void dump(Formatter *f) const {
417 encode_json("settings", settings, f);
418 encode_json("mappings", mappings, f);
419 }
420 };
421
422 static bool is_sys_attr(const std::string& attr_name){
423 static constexpr std::initializer_list<const char*> rgw_sys_attrs =
424 {RGW_ATTR_PG_VER,
425 RGW_ATTR_SOURCE_ZONE,
426 RGW_ATTR_ID_TAG,
427 RGW_ATTR_TEMPURL_KEY1,
428 RGW_ATTR_TEMPURL_KEY2,
429 RGW_ATTR_UNIX1,
430 RGW_ATTR_UNIX_KEY1
431 };
432
433 return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end();
434 }
435
436 static size_t attr_len(const bufferlist& val)
437 {
438 size_t len = val.length();
439 if (len && val[len - 1] == '\0') {
440 --len;
441 }
442
443 return len;
444 }
445
446 struct es_obj_metadata {
447 CephContext *cct;
448 ElasticConfigRef es_conf;
449 RGWBucketInfo bucket_info;
450 rgw_obj_key key;
451 ceph::real_time mtime;
452 uint64_t size;
453 map<string, bufferlist> attrs;
454 uint64_t versioned_epoch;
455
456 es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
457 const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
458 map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
459 mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
460
461 void dump(Formatter *f) const {
462 map<string, string> out_attrs;
463 map<string, string> custom_meta;
464 RGWAccessControlPolicy policy;
465 set<string> permissions;
466 RGWObjTags obj_tags;
467
468 for (auto i : attrs) {
469 const string& attr_name = i.first;
470 bufferlist& val = i.second;
471
472 if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) {
473 continue;
474 }
475
476 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) {
477 custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1),
478 string(val.c_str(), attr_len(val)));
479 continue;
480 }
481
482 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) {
483 continue;
484 }
485
486 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) {
487 // skip versioned object olh info
488 continue;
489 }
490
491 if (attr_name == RGW_ATTR_ACL) {
492 try {
493 auto i = val.cbegin();
494 decode(policy, i);
495 } catch (buffer::error& err) {
496 ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
497 continue;
498 }
499
500 const RGWAccessControlList& acl = policy.get_acl();
501
502 permissions.insert(policy.get_owner().get_id().to_str());
503 for (auto acliter : acl.get_grant_map()) {
504 const ACLGrant& grant = acliter.second;
505 if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
506 ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
507 rgw_user user;
508 if (grant.get_id(user)) {
509 permissions.insert(user.to_str());
510 }
511 }
512 }
513 } else if (attr_name == RGW_ATTR_TAGS) {
514 try {
515 auto tags_bl = val.cbegin();
516 decode(obj_tags, tags_bl);
517 } catch (buffer::error& err) {
518 ldout(cct,0) << "ERROR: failed to decode obj tags for "
519 << bucket_info.bucket << "/" << key << dendl;
520 continue;
521 }
522 } else if (attr_name == RGW_ATTR_COMPRESSION) {
523 RGWCompressionInfo cs_info;
524 try {
525 auto vals_bl = val.cbegin();
526 decode(cs_info, vals_bl);
527 } catch (buffer::error& err) {
528 ldout(cct,0) << "ERROR: failed to decode compression attr for "
529 << bucket_info.bucket << "/" << key << dendl;
530 continue;
531 }
532 out_attrs.emplace("compression",std::move(cs_info.compression_type));
533 } else {
534 if (!is_sys_attr(attr_name)) {
535 out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1),
536 std::string(val.c_str(), attr_len(val)));
537 }
538 }
539 }
540 ::encode_json("bucket", bucket_info.bucket.name, f);
541 ::encode_json("name", key.name, f);
542 string instance = key.instance;
543 if (instance.empty())
544 instance = "null";
545 ::encode_json("instance", instance, f);
546 ::encode_json("versioned_epoch", versioned_epoch, f);
547 ::encode_json("owner", policy.get_owner(), f);
548 ::encode_json("permissions", permissions, f);
549 f->open_object_section("meta");
550 ::encode_json("size", size, f);
551
552 string mtime_str;
553 rgw_to_iso8601(mtime, &mtime_str);
554 ::encode_json("mtime", mtime_str, f);
555 for (auto i : out_attrs) {
556 ::encode_json(i.first.c_str(), i.second, f);
557 }
558 map<string, string> custom_str;
559 map<string, string> custom_int;
560 map<string, string> custom_date;
561
562 for (auto i : custom_meta) {
563 auto config = bucket_info.mdsearch_config.find(i.first);
564 if (config == bucket_info.mdsearch_config.end()) {
565 if (!es_conf->explicit_custom_meta) {
566 /* default custom meta is of type string */
567 custom_str[i.first] = i.second;
568 } else {
569 ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
570 }
571 continue;
572 }
573 switch (config->second) {
574 case ESEntityTypeMap::ES_ENTITY_DATE:
575 custom_date[i.first] = i.second;
576 break;
577 case ESEntityTypeMap::ES_ENTITY_INT:
578 custom_int[i.first] = i.second;
579 break;
580 default:
581 custom_str[i.first] = i.second;
582 }
583 }
584
585 if (!custom_str.empty()) {
586 f->open_array_section("custom-string");
587 for (auto i : custom_str) {
588 f->open_object_section("entity");
589 ::encode_json("name", i.first.c_str(), f);
590 ::encode_json("value", i.second, f);
591 f->close_section();
592 }
593 f->close_section();
594 }
595 if (!custom_int.empty()) {
596 f->open_array_section("custom-int");
597 for (auto i : custom_int) {
598 f->open_object_section("entity");
599 ::encode_json("name", i.first.c_str(), f);
600 ::encode_json("value", i.second, f);
601 f->close_section();
602 }
603 f->close_section();
604 }
605 if (!custom_date.empty()) {
606 f->open_array_section("custom-date");
607 for (auto i : custom_date) {
608 /*
609 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
610 * which will end up with failed sync
611 */
612 real_time t;
613 int r = parse_time(i.second.c_str(), &t);
614 if (r < 0) {
615 ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
616 continue;
617 }
618
619 string time_str;
620 rgw_to_iso8601(t, &time_str);
621
622 f->open_object_section("entity");
623 ::encode_json("name", i.first.c_str(), f);
624 ::encode_json("value", time_str.c_str(), f);
625 f->close_section();
626 }
627 f->close_section();
628 }
629 f->close_section(); // meta
630 const auto& m = obj_tags.get_tags();
631 if (m.size() > 0){
632 f->open_array_section("tagging");
633 for (const auto &it : m) {
634 f->open_object_section("tag");
635 ::encode_json("key", it.first, f);
636 ::encode_json("value",it.second, f);
637 f->close_section();
638 }
639 f->close_section(); // tagging
640 }
641 }
642 };
643
644 class RGWElasticGetESInfoCBCR : public RGWCoroutine {
645 public:
646 RGWElasticGetESInfoCBCR(RGWDataSyncCtx *_sc,
647 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
648 sc(_sc), sync_env(_sc->env),
649 conf(_conf) {}
650 int operate() override {
651 reenter(this) {
652 ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sc->source_zone << dendl;
653 yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
654 conf->conn.get(),
655 sync_env->http_manager,
656 "/", nullptr /*params*/,
657 &(conf->default_headers),
658 &(conf->es_info)));
659 if (retcode < 0) {
660 ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
661 return set_cr_error(retcode);
662 }
663
664 ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
665 return set_cr_done();
666 }
667 return 0;
668 }
669 private:
670 RGWDataSyncCtx *sc;
671 RGWDataSyncEnv *sync_env;
672 ElasticConfigRef conf;
673 };
674
675 class RGWElasticPutIndexCBCR : public RGWCoroutine {
676 public:
677 RGWElasticPutIndexCBCR(RGWDataSyncCtx *_sc,
678 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
679 sc(_sc), sync_env(_sc->env),
680 conf(_conf) {}
681 int operate() override {
682 reenter(this) {
683 ldout(sc->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sc->source_zone << dendl;
684
685 yield {
686 string path = conf->get_index_path();
687 es_index_settings settings(conf->num_replicas, conf->num_shards);
688 std::unique_ptr<es_index_config_base> index_conf;
689
690 if (conf->es_info.version >= ES_V5) {
691 ldout(sc->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
692 index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
693 } else {
694 ldout(sc->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
695 index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
696 }
697 call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sc->cct,
698 conf->conn.get(),
699 sync_env->http_manager,
700 path, nullptr /*params*/,
701 &(conf->default_headers),
702 *index_conf, nullptr, &err_response));
703 }
704 if (retcode < 0) {
705
706 if (err_response.error.type != "index_already_exists_exception" &&
707 err_response.error.type != "resource_already_exists_exception") {
708 ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
709 return set_cr_error(retcode);
710 }
711
712 ldout(sync_env->cct, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl;
713 }
714 return set_cr_done();
715 }
716 return 0;
717 }
718
719 private:
720 RGWDataSyncCtx *sc;
721 RGWDataSyncEnv *sync_env;
722 ElasticConfigRef conf;
723
724 struct _err_response {
725 struct err_reason {
726 vector<err_reason> root_cause;
727 string type;
728 string reason;
729 string index;
730
731 void decode_json(JSONObj *obj) {
732 JSONDecoder::decode_json("root_cause", root_cause, obj);
733 JSONDecoder::decode_json("type", type, obj);
734 JSONDecoder::decode_json("reason", reason, obj);
735 JSONDecoder::decode_json("index", index, obj);
736 }
737 } error;
738
739 void decode_json(JSONObj *obj) {
740 JSONDecoder::decode_json("error", error, obj);
741 }
742 } err_response;
743 };
744
745 class RGWElasticInitConfigCBCR : public RGWCoroutine {
746 RGWDataSyncCtx *sc;
747 RGWDataSyncEnv *sync_env;
748 ElasticConfigRef conf;
749
750 public:
751 RGWElasticInitConfigCBCR(RGWDataSyncCtx *_sc,
752 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
753 sc(_sc), sync_env(_sc->env),
754 conf(_conf) {}
755 int operate() override {
756 reenter(this) {
757
758 yield call(new RGWElasticGetESInfoCBCR(sc, conf));
759
760 if (retcode < 0) {
761 return set_cr_error(retcode);
762 }
763
764 yield call(new RGWElasticPutIndexCBCR(sc, conf));
765 if (retcode < 0) {
766 return set_cr_error(retcode);
767 }
768 return set_cr_done();
769 }
770 return 0;
771 }
772
773 };
774
775 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
776 rgw_bucket_sync_pipe sync_pipe;
777 ElasticConfigRef conf;
778 uint64_t versioned_epoch;
779 public:
780 RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
781 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
782 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
783 sync_pipe(_sync_pipe), conf(_conf),
784 versioned_epoch(_versioned_epoch) {}
785 int operate() override {
786 reenter(this) {
787 ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone
788 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key
789 << " size=" << size << " mtime=" << mtime << dendl;
790
791 yield {
792 string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
793 es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch);
794
795 call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
796 sync_env->http_manager,
797 path, nullptr /* params */,
798 &(conf->default_headers),
799 doc, nullptr /* result */));
800
801 }
802 if (retcode < 0) {
803 return set_cr_error(retcode);
804 }
805 return set_cr_done();
806 }
807 return 0;
808 }
809 };
810
811 class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
812 rgw_bucket_sync_pipe sync_pipe;
813 ElasticConfigRef conf;
814 uint64_t versioned_epoch;
815 public:
816 RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc,
817 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
818 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
819 sync_pipe(_sync_pipe),
820 conf(_conf), versioned_epoch(_versioned_epoch) {
821 }
822
823 ~RGWElasticHandleRemoteObjCR() override {}
824
825 RGWStatRemoteObjCBCR *allocate_callback() override {
826 return new RGWElasticHandleRemoteObjCBCR(sc, sync_pipe, key, conf, versioned_epoch);
827 }
828 };
829
830 class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
831 RGWDataSyncCtx *sc;
832 RGWDataSyncEnv *sync_env;
833 rgw_bucket_sync_pipe sync_pipe;
834 rgw_obj_key key;
835 ceph::real_time mtime;
836 ElasticConfigRef conf;
837 public:
838 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
839 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
840 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
841 sync_pipe(_sync_pipe), key(_key),
842 mtime(_mtime), conf(_conf) {}
843 int operate() override {
844 reenter(this) {
845 ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone
846 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
847 yield {
848 string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
849
850 call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
851 sync_env->http_manager,
852 path, nullptr /* params */));
853 }
854 if (retcode < 0) {
855 return set_cr_error(retcode);
856 }
857 return set_cr_done();
858 }
859 return 0;
860 }
861
862 };
863
864 class RGWElasticDataSyncModule : public RGWDataSyncModule {
865 ElasticConfigRef conf;
866 public:
867 RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) {
868 conf->init(cct, config);
869 }
870 ~RGWElasticDataSyncModule() override {}
871
872 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
873 conf->init_instance(sc->env->svc->zone->get_realm(), instance_id);
874 }
875
876 RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override {
877 ldout(sc->cct, 5) << conf->id << ": init" << dendl;
878 return new RGWElasticInitConfigCBCR(sc, conf);
879 }
880
881 RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
882 ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl;
883 // try to get elastic search version
884 return new RGWElasticGetESInfoCBCR(sc, conf);
885 }
886
887 RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
888 ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
889 if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
890 ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
891 return nullptr;
892 }
893 return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0));
894 }
895 RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
896 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
897 ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
898 if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
899 ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
900 return nullptr;
901 }
902 return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf);
903 }
904 RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
905 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
906 ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
907 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
908 ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
909 return NULL;
910 }
911 RGWRESTConn *get_rest_conn() {
912 return conf->conn.get();
913 }
914
915 string get_index_path() {
916 return conf->get_index_path();
917 }
918
919 map<string, string>& get_request_headers() {
920 return conf->get_request_headers();
921 }
922 };
923
924 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
925 {
926 data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
927 }
928
929 RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
930 {
931 return data_handler.get();
932 }
933
934 RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
935 {
936 return data_handler->get_rest_conn();
937 }
938
939 string RGWElasticSyncModuleInstance::get_index_path() {
940 return data_handler->get_index_path();
941 }
942
943 map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() {
944 return data_handler->get_request_headers();
945 }
946
947 RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
948 if (dialect != RGW_REST_S3) {
949 return orig;
950 }
951 delete orig;
952 return new RGWRESTMgr_MDSearch_S3();
953 }
954
955 int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
956 string endpoint = config["endpoint"];
957 instance->reset(new RGWElasticSyncModuleInstance(cct, config));
958 return 0;
959 }
960