]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_es.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
CommitLineData
a8e16298
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_b64.h"
7c673cae
FG
5#include "rgw_common.h"
6#include "rgw_coroutine.h"
7#include "rgw_sync_module.h"
8#include "rgw_data_sync.h"
7c673cae 9#include "rgw_sync_module_es.h"
31f18b77 10#include "rgw_sync_module_es_rest.h"
7c673cae
FG
11#include "rgw_rest_conn.h"
12#include "rgw_cr_rest.h"
31f18b77
FG
13#include "rgw_op.h"
14#include "rgw_es_query.h"
11fdf7f2
TL
15#include "rgw_zone.h"
16
17#include "services/svc_zone.h"
31f18b77
FG
18
19#include "include/str_list.h"
20
21#include <boost/asio/yield.hpp>
7c673cae
FG
22
23#define dout_subsys ceph_subsys_rgw
24
31f18b77
FG
25
26/*
27 * whitelist utility. Config string is a list of entries, where an entry is either an item,
28 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
29 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
30 * with an asterisk. For example:
31 *
32 * bucket1, bucket2, foo*, *bar
33 */
34class ItemList {
35 bool approve_all{false};
36
37 set<string> entries;
38 set<string> prefixes;
39 set<string> suffixes;
40
41 void parse(const string& str) {
42 list<string> l;
43
44 get_str_list(str, ",", l);
45
46 for (auto& entry : l) {
47 entry = rgw_trim_whitespace(entry);
48 if (entry.empty()) {
49 continue;
50 }
51
52 if (entry == "*") {
53 approve_all = true;
54 return;
55 }
56
57 if (entry[0] == '*') {
58 suffixes.insert(entry.substr(1));
59 continue;
60 }
61
62 if (entry.back() == '*') {
63 prefixes.insert(entry.substr(0, entry.size() - 1));
64 continue;
65 }
66
67 entries.insert(entry);
68 }
69 }
70
71public:
72 ItemList() {}
73 void init(const string& str, bool def_val) {
74 if (str.empty()) {
75 approve_all = def_val;
76 } else {
77 parse(str);
78 }
79 }
80
81 bool exists(const string& entry) {
82 if (approve_all) {
83 return true;
84 }
85
86 if (entries.find(entry) != entries.end()) {
87 return true;
88 }
89
90 auto i = prefixes.upper_bound(entry);
91 if (i != prefixes.begin()) {
92 --i;
93 if (boost::algorithm::starts_with(entry, *i)) {
94 return true;
95 }
96 }
97
98 for (i = suffixes.begin(); i != suffixes.end(); ++i) {
99 if (boost::algorithm::ends_with(entry, *i)) {
100 return true;
101 }
102 }
103
104 return false;
105 }
106};
107
108#define ES_NUM_SHARDS_MIN 5
109
110#define ES_NUM_SHARDS_DEFAULT 16
111#define ES_NUM_REPLICAS_DEFAULT 1
112
a8e16298
TL
113using ESVersion = std::pair<int,int>;
114static constexpr ESVersion ES_V5{5,0};
115
116struct ESInfo {
117 std::string name;
118 std::string cluster_name;
119 std::string cluster_uuid;
120 ESVersion version;
121
122 void decode_json(JSONObj *obj);
123
124 std::string get_version_str(){
125 return std::to_string(version.first) + "." + std::to_string(version.second);
126 }
127};
128
129// simple wrapper structure to wrap the es version nested type
130struct es_version_decoder {
131 ESVersion version;
132
133 int parse_version(const std::string& s) {
134 int major, minor;
135 int ret = sscanf(s.c_str(), "%d.%d", &major, &minor);
136 if (ret < 0) {
137 return ret;
138 }
139 version = std::make_pair(major,minor);
140 return 0;
141 }
142
143 void decode_json(JSONObj *obj) {
144 std::string s;
145 JSONDecoder::decode_json("number",s,obj);
146 if (parse_version(s) < 0)
147 throw JSONDecoder::err("Failed to parse ElasticVersion");
148 }
149};
150
151
152void ESInfo::decode_json(JSONObj *obj)
153{
154 JSONDecoder::decode_json("name", name, obj);
155 JSONDecoder::decode_json("cluster_name", cluster_name, obj);
156 JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj);
157 es_version_decoder esv;
158 JSONDecoder::decode_json("version", esv, obj);
159 version = std::move(esv.version);
160}
161
7c673cae 162struct ElasticConfig {
31f18b77 163 uint64_t sync_instance{0};
7c673cae 164 string id;
31f18b77
FG
165 string index_path;
166 std::unique_ptr<RGWRESTConn> conn;
167 bool explicit_custom_meta{true};
168 string override_index_path;
169 ItemList index_buckets;
170 ItemList allow_owners;
171 uint32_t num_shards{0};
172 uint32_t num_replicas{0};
a8e16298 173 std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
31f18b77 174
11fdf7f2
TL
175 void init(CephContext *cct, const JSONFormattable& config) {
176 string elastic_endpoint = config["endpoint"];
31f18b77
FG
177 id = string("elastic:") + elastic_endpoint;
178 conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
11fdf7f2
TL
179 explicit_custom_meta = config["explicit_custom_meta"](true);
180 index_buckets.init(config["index_buckets_list"], true); /* approve all buckets by default */
181 allow_owners.init(config["approved_owners_list"], true); /* approve all bucket owners by default */
182 override_index_path = config["override_index_path"];
183 num_shards = config["num_shards"](ES_NUM_SHARDS_DEFAULT);
31f18b77
FG
184 if (num_shards < ES_NUM_SHARDS_MIN) {
185 num_shards = ES_NUM_SHARDS_MIN;
186 }
11fdf7f2
TL
187 num_replicas = config["num_replicas"](ES_NUM_REPLICAS_DEFAULT);
188 if (string user = config["username"], pw = config["password"];
189 !user.empty() && !pw.empty()) {
a8e16298
TL
190 auto auth_string = user + ":" + pw;
191 default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string));
192 }
11fdf7f2 193
31f18b77
FG
194 }
195
11fdf7f2 196 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
31f18b77
FG
197 sync_instance = instance_id;
198
199 if (!override_index_path.empty()) {
200 index_path = override_index_path;
201 return;
202 }
203
204 char buf[32];
205 snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
206
207 index_path = "/rgw-" + realm.get_name() + buf;
208 }
209
210 string get_index_path() {
211 return index_path;
212 }
213
a8e16298
TL
214 map<string, string>& get_request_headers() {
215 return default_headers;
216 }
217
31f18b77 218 string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
91327a77 219 return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
31f18b77
FG
220 }
221
222 bool should_handle_operation(RGWBucketInfo& bucket_info) {
223 return index_buckets.exists(bucket_info.bucket.name) &&
224 allow_owners.exists(bucket_info.owner.to_str());
225 }
7c673cae
FG
226};
227
31f18b77
FG
228using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
229
a8e16298
TL
230static const char *es_type_to_str(const ESType& t) {
231 switch (t) {
232 case ESType::String: return "string";
233 case ESType::Text: return "text";
234 case ESType::Keyword: return "keyword";
235 case ESType::Long: return "long";
236 case ESType::Integer: return "integer";
237 case ESType::Short: return "short";
238 case ESType::Byte: return "byte";
239 case ESType::Double: return "double";
240 case ESType::Float: return "float";
241 case ESType::Half_Float: return "half_float";
242 case ESType::Scaled_Float: return "scaled_float";
243 case ESType::Date: return "date";
244 case ESType::Boolean: return "boolean";
245 case ESType::Integer_Range: return "integer_range";
246 case ESType::Float_Range: return "float_range";
247 case ESType::Double_Range: return "date_range";
248 case ESType::Date_Range: return "date_range";
249 case ESType::Geo_Point: return "geo_point";
250 case ESType::Ip: return "ip";
251 default:
252 return "<unknown>";
253 }
254}
255
256struct es_type_v2 {
257 ESType estype;
258 const char *format{nullptr};
11fdf7f2 259 std::optional<bool> analyzed;
a8e16298
TL
260
261 es_type_v2(ESType et) : estype(et) {}
262
263 void dump(Formatter *f) const {
264 const char *type_str = es_type_to_str(estype);
265 encode_json("type", type_str, f);
266 if (format) {
267 encode_json("format", format, f);
268 }
269
270 auto is_analyzed = analyzed;
271
272 if (estype == ESType::String &&
273 !is_analyzed) {
274 is_analyzed = false;
275 }
276
277 if (is_analyzed) {
278 encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f);
279 }
280 }
281};
31f18b77 282
a8e16298
TL
283struct es_type_v5 {
284 ESType estype;
285 const char *format{nullptr};
11fdf7f2
TL
286 std::optional<bool> analyzed;
287 std::optional<bool> index;
a8e16298
TL
288
289 es_type_v5(ESType et) : estype(et) {}
31f18b77
FG
290
291 void dump(Formatter *f) const {
a8e16298
TL
292 ESType new_estype;
293 if (estype != ESType::String) {
294 new_estype = estype;
295 } else {
296 bool is_analyzed = analyzed.value_or(false);
297 new_estype = (is_analyzed ? ESType::Text : ESType::Keyword);
298 /* index = true; ... Not setting index=true, because that's the default,
299 * and dumping a boolean value *might* be a problem when backporting this
300 * because value might get quoted
301 */
302 }
303
304 const char *type_str = es_type_to_str(new_estype);
305 encode_json("type", type_str, f);
31f18b77
FG
306 if (format) {
307 encode_json("format", format, f);
308 }
a8e16298
TL
309 if (index) {
310 encode_json("index", index.value(), f);
31f18b77
FG
311 }
312 }
313};
314
a8e16298
TL
315template <class T>
316struct es_type : public T {
317 es_type(T t) : T(t) {}
318 es_type& set_format(const char *f) {
319 T::format = f;
320 return *this;
321 }
322
323 es_type& set_analyzed(bool a) {
324 T::analyzed = a;
325 return *this;
326 }
327};
328
329template <class T>
31f18b77 330struct es_index_mappings {
a8e16298
TL
331 ESType string_type {ESType::String};
332
333 es_type<T> est(ESType t) const {
334 return es_type<T>(t);
335 }
336
337 void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const {
31f18b77
FG
338 f->open_object_section(section);
339 ::encode_json("type", "nested", f);
340 f->open_object_section("properties");
a8e16298
TL
341 encode_json("name", est(string_type), f);
342 encode_json("value", est(type).set_format(format), f);
31f18b77
FG
343 f->close_section(); // entry
344 f->close_section(); // custom-string
345 }
a8e16298 346
31f18b77
FG
347 void dump(Formatter *f) const {
348 f->open_object_section("object");
349 f->open_object_section("properties");
a8e16298
TL
350 encode_json("bucket", est(string_type), f);
351 encode_json("name", est(string_type), f);
352 encode_json("instance", est(string_type), f);
353 encode_json("versioned_epoch", est(ESType::Long), f);
31f18b77
FG
354 f->open_object_section("meta");
355 f->open_object_section("properties");
a8e16298
TL
356 encode_json("cache_control", est(string_type), f);
357 encode_json("content_disposition", est(string_type), f);
358 encode_json("content_encoding", est(string_type), f);
359 encode_json("content_language", est(string_type), f);
360 encode_json("content_type", est(string_type), f);
361 encode_json("storage_class", est(string_type), f);
362 encode_json("etag", est(string_type), f);
363 encode_json("expires", est(string_type), f);
364 encode_json("mtime", est(ESType::Date)
365 .set_format("strict_date_optional_time||epoch_millis"), f);
366 encode_json("size", est(ESType::Long), f);
367 dump_custom("custom-string", string_type, nullptr, f);
368 dump_custom("custom-int", ESType::Long, nullptr, f);
369 dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f);
31f18b77
FG
370 f->close_section(); // properties
371 f->close_section(); // meta
372 f->close_section(); // properties
373 f->close_section(); // object
374 }
375};
376
377struct es_index_settings {
378 uint32_t num_replicas;
379 uint32_t num_shards;
380
381 es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
382
383 void dump(Formatter *f) const {
384 encode_json("number_of_replicas", num_replicas, f);
385 encode_json("number_of_shards", num_shards, f);
386 }
387};
388
a8e16298
TL
389struct es_index_config_base {
390 virtual ~es_index_config_base() {}
391 virtual void dump(Formatter *f) const = 0;
392};
393
394template <class T>
395struct es_index_config : public es_index_config_base {
31f18b77 396 es_index_settings settings;
a8e16298 397 es_index_mappings<T> mappings;
31f18b77 398
a8e16298 399 es_index_config(es_index_settings& _s) : settings(_s) {}
31f18b77
FG
400
401 void dump(Formatter *f) const {
402 encode_json("settings", settings, f);
403 encode_json("mappings", mappings, f);
404 }
405};
7c673cae 406
f64942e4 407static bool is_sys_attr(const std::string& attr_name){
11fdf7f2
TL
408 static constexpr std::initializer_list<const char*> rgw_sys_attrs =
409 {RGW_ATTR_PG_VER,
410 RGW_ATTR_SOURCE_ZONE,
411 RGW_ATTR_ID_TAG,
412 RGW_ATTR_TEMPURL_KEY1,
413 RGW_ATTR_TEMPURL_KEY2,
414 RGW_ATTR_UNIX1,
415 RGW_ATTR_UNIX_KEY1
f64942e4
AA
416 };
417
418 return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end();
419}
420
a8e16298
TL
421static size_t attr_len(const bufferlist& val)
422{
423 size_t len = val.length();
424 if (len && val[len - 1] == '\0') {
425 --len;
426 }
427
428 return len;
429}
430
7c673cae
FG
431struct es_obj_metadata {
432 CephContext *cct;
31f18b77 433 ElasticConfigRef es_conf;
7c673cae
FG
434 RGWBucketInfo bucket_info;
435 rgw_obj_key key;
436 ceph::real_time mtime;
437 uint64_t size;
438 map<string, bufferlist> attrs;
31f18b77 439 uint64_t versioned_epoch;
7c673cae 440
31f18b77 441 es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
7c673cae 442 const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
31f18b77
FG
443 map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
444 mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
7c673cae
FG
445
446 void dump(Formatter *f) const {
447 map<string, string> out_attrs;
448 map<string, string> custom_meta;
449 RGWAccessControlPolicy policy;
450 set<string> permissions;
224ce89b 451 RGWObjTags obj_tags;
7c673cae
FG
452
453 for (auto i : attrs) {
454 const string& attr_name = i.first;
7c673cae
FG
455 bufferlist& val = i.second;
456
a8e16298 457 if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) {
7c673cae
FG
458 continue;
459 }
460
a8e16298 461 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) {
f64942e4 462 custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1),
a8e16298
TL
463 string(val.c_str(), attr_len(val)));
464 continue;
465 }
466
467 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) {
7c673cae
FG
468 continue;
469 }
470
a8e16298
TL
471 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) {
472 // skip versioned object olh info
f64942e4
AA
473 continue;
474 }
7c673cae 475
f64942e4 476 if (attr_name == RGW_ATTR_ACL) {
7c673cae 477 try {
11fdf7f2
TL
478 auto i = val.cbegin();
479 decode(policy, i);
7c673cae
FG
480 } catch (buffer::error& err) {
481 ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
f64942e4 482 continue;
7c673cae
FG
483 }
484
485 const RGWAccessControlList& acl = policy.get_acl();
486
487 permissions.insert(policy.get_owner().get_id().to_str());
488 for (auto acliter : acl.get_grant_map()) {
489 const ACLGrant& grant = acliter.second;
490 if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
491 ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
492 rgw_user user;
493 if (grant.get_id(user)) {
494 permissions.insert(user.to_str());
495 }
496 }
497 }
f64942e4
AA
498 } else if (attr_name == RGW_ATTR_TAGS) {
499 try {
11fdf7f2
TL
500 auto tags_bl = val.cbegin();
501 decode(obj_tags, tags_bl);
f64942e4
AA
502 } catch (buffer::error& err) {
503 ldout(cct,0) << "ERROR: failed to decode obj tags for "
504 << bucket_info.bucket << "/" << key << dendl;
505 continue;
506 }
507 } else if (attr_name == RGW_ATTR_COMPRESSION) {
28e407b8 508 RGWCompressionInfo cs_info;
f64942e4 509 try {
11fdf7f2
TL
510 auto vals_bl = val.cbegin();
511 decode(cs_info, vals_bl);
f64942e4
AA
512 } catch (buffer::error& err) {
513 ldout(cct,0) << "ERROR: failed to decode compression attr for "
514 << bucket_info.bucket << "/" << key << dendl;
515 continue;
516 }
517 out_attrs.emplace("compression",std::move(cs_info.compression_type));
7c673cae 518 } else {
f64942e4
AA
519 if (!is_sys_attr(attr_name)) {
520 out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1),
a8e16298 521 std::string(val.c_str(), attr_len(val)));
7c673cae
FG
522 }
523 }
524 }
525 ::encode_json("bucket", bucket_info.bucket.name, f);
526 ::encode_json("name", key.name, f);
a8e16298
TL
527 string instance = key.instance;
528 if (instance.empty())
529 instance = "null";
530 ::encode_json("instance", instance, f);
31f18b77 531 ::encode_json("versioned_epoch", versioned_epoch, f);
7c673cae
FG
532 ::encode_json("owner", policy.get_owner(), f);
533 ::encode_json("permissions", permissions, f);
534 f->open_object_section("meta");
535 ::encode_json("size", size, f);
536
537 string mtime_str;
538 rgw_to_iso8601(mtime, &mtime_str);
539 ::encode_json("mtime", mtime_str, f);
540 for (auto i : out_attrs) {
541 ::encode_json(i.first.c_str(), i.second, f);
542 }
31f18b77
FG
543 map<string, string> custom_str;
544 map<string, string> custom_int;
545 map<string, string> custom_date;
546
547 for (auto i : custom_meta) {
548 auto config = bucket_info.mdsearch_config.find(i.first);
549 if (config == bucket_info.mdsearch_config.end()) {
550 if (!es_conf->explicit_custom_meta) {
551 /* default custom meta is of type string */
552 custom_str[i.first] = i.second;
553 } else {
554 ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
555 }
556 continue;
557 }
558 switch (config->second) {
559 case ESEntityTypeMap::ES_ENTITY_DATE:
560 custom_date[i.first] = i.second;
561 break;
562 case ESEntityTypeMap::ES_ENTITY_INT:
563 custom_int[i.first] = i.second;
564 break;
565 default:
566 custom_str[i.first] = i.second;
567 }
568 }
569
570 if (!custom_str.empty()) {
571 f->open_array_section("custom-string");
572 for (auto i : custom_str) {
573 f->open_object_section("entity");
574 ::encode_json("name", i.first.c_str(), f);
575 ::encode_json("value", i.second, f);
576 f->close_section();
577 }
578 f->close_section();
579 }
580 if (!custom_int.empty()) {
581 f->open_array_section("custom-int");
582 for (auto i : custom_int) {
583 f->open_object_section("entity");
584 ::encode_json("name", i.first.c_str(), f);
585 ::encode_json("value", i.second, f);
586 f->close_section();
587 }
588 f->close_section();
589 }
590 if (!custom_date.empty()) {
591 f->open_array_section("custom-date");
592 for (auto i : custom_date) {
593 /*
594 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
595 * which will end up with failed sync
596 */
597 real_time t;
598 int r = parse_time(i.second.c_str(), &t);
599 if (r < 0) {
600 ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
601 continue;
602 }
603
604 string time_str;
605 rgw_to_iso8601(t, &time_str);
606
607 f->open_object_section("entity");
608 ::encode_json("name", i.first.c_str(), f);
609 ::encode_json("value", time_str.c_str(), f);
610 f->close_section();
7c673cae
FG
611 }
612 f->close_section();
613 }
224ce89b
WB
614 f->close_section(); // meta
615 const auto& m = obj_tags.get_tags();
616 if (m.size() > 0){
617 f->open_array_section("tagging");
618 for (const auto &it : m) {
619 f->open_object_section("tag");
620 ::encode_json("key", it.first, f);
621 ::encode_json("value",it.second, f);
622 f->close_section();
623 }
624 f->close_section(); // tagging
625 }
7c673cae 626 }
31f18b77
FG
627};
628
629class RGWElasticInitConfigCBCR : public RGWCoroutine {
630 RGWDataSyncEnv *sync_env;
631 ElasticConfigRef conf;
a8e16298
TL
632 ESInfo es_info;
633
634 struct _err_response {
635 struct err_reason {
636 vector<err_reason> root_cause;
637 string type;
638 string reason;
639 string index;
640
641 void decode_json(JSONObj *obj) {
642 JSONDecoder::decode_json("root_cause", root_cause, obj);
643 JSONDecoder::decode_json("type", type, obj);
644 JSONDecoder::decode_json("reason", reason, obj);
645 JSONDecoder::decode_json("index", index, obj);
646 }
647 } error;
648
649 void decode_json(JSONObj *obj) {
650 JSONDecoder::decode_json("error", error, obj);
651 }
652 } err_response;
653
31f18b77
FG
654public:
655 RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
656 ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
657 sync_env(_sync_env),
658 conf(_conf) {}
659 int operate() override {
660 reenter(this) {
661 ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
a8e16298
TL
662 yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
663 conf->conn.get(),
664 sync_env->http_manager,
665 "/", nullptr /*params*/,
666 &(conf->default_headers),
667 &es_info));
668 if (retcode < 0) {
669 return set_cr_error(retcode);
670 }
671
31f18b77
FG
672 yield {
673 string path = conf->get_index_path();
a8e16298 674 ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;
31f18b77
FG
675
676 es_index_settings settings(conf->num_replicas, conf->num_shards);
31f18b77 677
a8e16298 678 std::unique_ptr<es_index_config_base> index_conf;
31f18b77 679
a8e16298
TL
680 if (es_info.version >= ES_V5) {
681 ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
682 index_conf.reset(new es_index_config<es_type_v5>(settings));
683 } else {
684 ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
685 index_conf.reset(new es_index_config<es_type_v2>(settings));
686 }
687 call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
688 conf->conn.get(),
689 sync_env->http_manager,
690 path, nullptr /*params*/,
691 &(conf->default_headers),
692 *index_conf, nullptr, &err_response));
31f18b77
FG
693 }
694 if (retcode < 0) {
a8e16298
TL
695 ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
696
697 if (err_response.error.type != "index_already_exists_exception") {
698 return set_cr_error(retcode);
699 }
700
701 ldout(sync_env->cct, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl;
31f18b77
FG
702 }
703 return set_cr_done();
704 }
705 return 0;
706 }
7c673cae
FG
707
708};
709
710class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
31f18b77
FG
711 ElasticConfigRef conf;
712 uint64_t versioned_epoch;
7c673cae
FG
713public:
714 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
715 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
31f18b77
FG
716 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
717 versioned_epoch(_versioned_epoch) {}
7c673cae
FG
718 int operate() override {
719 reenter(this) {
31f18b77 720 ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
a8e16298
TL
721 << " b=" << bucket_info.bucket << " k=" << key
722 << " size=" << size << " mtime=" << mtime << dendl;
723
7c673cae 724 yield {
31f18b77
FG
725 string path = conf->get_obj_path(bucket_info, key);
726 es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
7c673cae 727
31f18b77 728 call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
7c673cae
FG
729 sync_env->http_manager,
730 path, nullptr /* params */,
a8e16298 731 &(conf->default_headers),
7c673cae
FG
732 doc, nullptr /* result */));
733
734 }
735 if (retcode < 0) {
736 return set_cr_error(retcode);
737 }
738 return set_cr_done();
739 }
740 return 0;
741 }
7c673cae
FG
742};
743
744class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
31f18b77
FG
745 ElasticConfigRef conf;
746 uint64_t versioned_epoch;
7c673cae
FG
747public:
748 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
749 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
31f18b77
FG
750 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
751 conf(_conf), versioned_epoch(_versioned_epoch) {
7c673cae
FG
752 }
753
754 ~RGWElasticHandleRemoteObjCR() override {}
755
756 RGWStatRemoteObjCBCR *allocate_callback() override {
31f18b77 757 return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
7c673cae
FG
758 }
759};
760
761class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
762 RGWDataSyncEnv *sync_env;
763 RGWBucketInfo bucket_info;
764 rgw_obj_key key;
765 ceph::real_time mtime;
31f18b77 766 ElasticConfigRef conf;
7c673cae
FG
767public:
768 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
769 RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
31f18b77 770 ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
771 bucket_info(_bucket_info), key(_key),
772 mtime(_mtime), conf(_conf) {}
773 int operate() override {
774 reenter(this) {
31f18b77
FG
775 ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
776 << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
7c673cae 777 yield {
31f18b77 778 string path = conf->get_obj_path(bucket_info, key);
7c673cae 779
31f18b77 780 call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
7c673cae
FG
781 sync_env->http_manager,
782 path, nullptr /* params */));
783 }
784 if (retcode < 0) {
785 return set_cr_error(retcode);
786 }
787 return set_cr_done();
788 }
789 return 0;
790 }
791
792};
793
794class RGWElasticDataSyncModule : public RGWDataSyncModule {
31f18b77 795 ElasticConfigRef conf;
7c673cae 796public:
11fdf7f2 797 RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) {
31f18b77 798 conf->init(cct, config);
7c673cae 799 }
31f18b77
FG
800 ~RGWElasticDataSyncModule() override {}
801
802 void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
11fdf7f2 803 conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id);
7c673cae
FG
804 }
805
31f18b77
FG
806 RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
807 ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
808 return new RGWElasticInitConfigCBCR(sync_env, conf);
809 }
11fdf7f2 810 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
91327a77 811 ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
31f18b77
FG
812 if (!conf->should_handle_operation(bucket_info)) {
813 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
814 return nullptr;
815 }
91327a77 816 return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
7c673cae 817 }
31f18b77 818 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
7c673cae 819 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
31f18b77
FG
820 ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
821 if (!conf->should_handle_operation(bucket_info)) {
822 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
823 return nullptr;
824 }
7c673cae
FG
825 return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
826 }
827 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77
FG
828 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
829 ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
7c673cae 830 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
31f18b77 831 ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
7c673cae
FG
832 return NULL;
833 }
31f18b77
FG
834 RGWRESTConn *get_rest_conn() {
835 return conf->conn.get();
836 }
7c673cae 837
31f18b77
FG
838 string get_index_path() {
839 return conf->get_index_path();
7c673cae 840 }
a8e16298
TL
841
842 map<string, string>& get_request_headers() {
843 return conf->get_request_headers();
844 }
7c673cae
FG
845};
846
11fdf7f2 847RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
31f18b77
FG
848{
849 data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
850}
851
852RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
853{
854 return data_handler.get();
855}
856
857RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
858{
859 return data_handler->get_rest_conn();
860}
861
862string RGWElasticSyncModuleInstance::get_index_path() {
863 return data_handler->get_index_path();
864}
865
a8e16298
TL
866map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() {
867 return data_handler->get_request_headers();
868}
869
31f18b77
FG
870RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
871 if (dialect != RGW_REST_S3) {
872 return orig;
873 }
874 delete orig;
875 return new RGWRESTMgr_MDSearch_S3();
876}
877
11fdf7f2
TL
878int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
879 string endpoint = config["endpoint"];
31f18b77 880 instance->reset(new RGWElasticSyncModuleInstance(cct, config));
7c673cae
FG
881 return 0;
882}
883