]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_es.cc
import quincy 17.2.0
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
CommitLineData
a8e16298 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
a8e16298
TL
3
4#include "rgw_b64.h"
7c673cae
FG
5#include "rgw_common.h"
6#include "rgw_coroutine.h"
7#include "rgw_sync_module.h"
8#include "rgw_data_sync.h"
7c673cae 9#include "rgw_sync_module_es.h"
31f18b77 10#include "rgw_sync_module_es_rest.h"
7c673cae
FG
11#include "rgw_rest_conn.h"
12#include "rgw_cr_rest.h"
31f18b77
FG
13#include "rgw_op.h"
14#include "rgw_es_query.h"
11fdf7f2
TL
15#include "rgw_zone.h"
16
17#include "services/svc_zone.h"
31f18b77
FG
18
19#include "include/str_list.h"
20
21#include <boost/asio/yield.hpp>
7c673cae
FG
22
23#define dout_subsys ceph_subsys_rgw
24
20effc67 25using namespace std;
31f18b77
FG
26
27/*
f67539c2 28 * allowlist utility. Config string is a list of entries, where an entry is either an item,
31f18b77
FG
29 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
30 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
31 * with an asterisk. For example:
32 *
33 * bucket1, bucket2, foo*, *bar
34 */
35class ItemList {
36 bool approve_all{false};
37
38 set<string> entries;
39 set<string> prefixes;
40 set<string> suffixes;
41
42 void parse(const string& str) {
43 list<string> l;
44
45 get_str_list(str, ",", l);
46
47 for (auto& entry : l) {
48 entry = rgw_trim_whitespace(entry);
49 if (entry.empty()) {
50 continue;
51 }
52
53 if (entry == "*") {
54 approve_all = true;
55 return;
56 }
57
58 if (entry[0] == '*') {
59 suffixes.insert(entry.substr(1));
60 continue;
61 }
62
63 if (entry.back() == '*') {
64 prefixes.insert(entry.substr(0, entry.size() - 1));
65 continue;
66 }
67
68 entries.insert(entry);
69 }
70 }
71
72public:
73 ItemList() {}
74 void init(const string& str, bool def_val) {
75 if (str.empty()) {
76 approve_all = def_val;
77 } else {
78 parse(str);
79 }
80 }
81
82 bool exists(const string& entry) {
83 if (approve_all) {
84 return true;
85 }
86
87 if (entries.find(entry) != entries.end()) {
88 return true;
89 }
90
91 auto i = prefixes.upper_bound(entry);
92 if (i != prefixes.begin()) {
93 --i;
94 if (boost::algorithm::starts_with(entry, *i)) {
95 return true;
96 }
97 }
98
99 for (i = suffixes.begin(); i != suffixes.end(); ++i) {
100 if (boost::algorithm::ends_with(entry, *i)) {
101 return true;
102 }
103 }
104
105 return false;
106 }
107};
108
109#define ES_NUM_SHARDS_MIN 5
110
111#define ES_NUM_SHARDS_DEFAULT 16
112#define ES_NUM_REPLICAS_DEFAULT 1
113
a8e16298
TL
114using ESVersion = std::pair<int,int>;
115static constexpr ESVersion ES_V5{5,0};
eafe8130 116static constexpr ESVersion ES_V7{7,0};
a8e16298
TL
117
118struct ESInfo {
119 std::string name;
120 std::string cluster_name;
121 std::string cluster_uuid;
122 ESVersion version;
123
124 void decode_json(JSONObj *obj);
125
126 std::string get_version_str(){
127 return std::to_string(version.first) + "." + std::to_string(version.second);
128 }
129};
130
131// simple wrapper structure to wrap the es version nested type
132struct es_version_decoder {
133 ESVersion version;
134
135 int parse_version(const std::string& s) {
136 int major, minor;
137 int ret = sscanf(s.c_str(), "%d.%d", &major, &minor);
138 if (ret < 0) {
139 return ret;
140 }
141 version = std::make_pair(major,minor);
142 return 0;
143 }
144
145 void decode_json(JSONObj *obj) {
146 std::string s;
147 JSONDecoder::decode_json("number",s,obj);
148 if (parse_version(s) < 0)
149 throw JSONDecoder::err("Failed to parse ElasticVersion");
150 }
151};
152
153
154void ESInfo::decode_json(JSONObj *obj)
155{
156 JSONDecoder::decode_json("name", name, obj);
157 JSONDecoder::decode_json("cluster_name", cluster_name, obj);
158 JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj);
159 es_version_decoder esv;
160 JSONDecoder::decode_json("version", esv, obj);
161 version = std::move(esv.version);
162}
163
7c673cae 164struct ElasticConfig {
31f18b77 165 uint64_t sync_instance{0};
7c673cae 166 string id;
31f18b77
FG
167 string index_path;
168 std::unique_ptr<RGWRESTConn> conn;
169 bool explicit_custom_meta{true};
170 string override_index_path;
171 ItemList index_buckets;
172 ItemList allow_owners;
173 uint32_t num_shards{0};
174 uint32_t num_replicas{0};
a8e16298 175 std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
eafe8130 176 ESInfo es_info;
31f18b77 177
11fdf7f2
TL
178 void init(CephContext *cct, const JSONFormattable& config) {
179 string elastic_endpoint = config["endpoint"];
31f18b77 180 id = string("elastic:") + elastic_endpoint;
20effc67 181 conn.reset(new RGWRESTConn(cct, (RGWSI_Zone*)nullptr, id, { elastic_endpoint }, nullopt /* region */ ));
11fdf7f2
TL
182 explicit_custom_meta = config["explicit_custom_meta"](true);
183 index_buckets.init(config["index_buckets_list"], true); /* approve all buckets by default */
184 allow_owners.init(config["approved_owners_list"], true); /* approve all bucket owners by default */
185 override_index_path = config["override_index_path"];
186 num_shards = config["num_shards"](ES_NUM_SHARDS_DEFAULT);
31f18b77
FG
187 if (num_shards < ES_NUM_SHARDS_MIN) {
188 num_shards = ES_NUM_SHARDS_MIN;
189 }
11fdf7f2
TL
190 num_replicas = config["num_replicas"](ES_NUM_REPLICAS_DEFAULT);
191 if (string user = config["username"], pw = config["password"];
192 !user.empty() && !pw.empty()) {
a8e16298
TL
193 auto auth_string = user + ":" + pw;
194 default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string));
195 }
11fdf7f2 196
31f18b77
FG
197 }
198
11fdf7f2 199 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
31f18b77
FG
200 sync_instance = instance_id;
201
202 if (!override_index_path.empty()) {
203 index_path = override_index_path;
204 return;
205 }
206
207 char buf[32];
208 snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
209
210 index_path = "/rgw-" + realm.get_name() + buf;
211 }
212
213 string get_index_path() {
214 return index_path;
215 }
216
a8e16298
TL
217 map<string, string>& get_request_headers() {
218 return default_headers;
219 }
220
31f18b77 221 string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
eafe8130
TL
222 if (es_info.version >= ES_V7) {
223 return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
224;
225 } else {
226 return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
227 }
31f18b77
FG
228 }
229
230 bool should_handle_operation(RGWBucketInfo& bucket_info) {
231 return index_buckets.exists(bucket_info.bucket.name) &&
232 allow_owners.exists(bucket_info.owner.to_str());
233 }
7c673cae
FG
234};
235
31f18b77
FG
236using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
237
a8e16298
TL
238static const char *es_type_to_str(const ESType& t) {
239 switch (t) {
240 case ESType::String: return "string";
241 case ESType::Text: return "text";
242 case ESType::Keyword: return "keyword";
243 case ESType::Long: return "long";
244 case ESType::Integer: return "integer";
245 case ESType::Short: return "short";
246 case ESType::Byte: return "byte";
247 case ESType::Double: return "double";
248 case ESType::Float: return "float";
249 case ESType::Half_Float: return "half_float";
250 case ESType::Scaled_Float: return "scaled_float";
251 case ESType::Date: return "date";
252 case ESType::Boolean: return "boolean";
253 case ESType::Integer_Range: return "integer_range";
254 case ESType::Float_Range: return "float_range";
255 case ESType::Double_Range: return "date_range";
256 case ESType::Date_Range: return "date_range";
257 case ESType::Geo_Point: return "geo_point";
258 case ESType::Ip: return "ip";
259 default:
260 return "<unknown>";
261 }
262}
263
264struct es_type_v2 {
265 ESType estype;
266 const char *format{nullptr};
11fdf7f2 267 std::optional<bool> analyzed;
a8e16298
TL
268
269 es_type_v2(ESType et) : estype(et) {}
270
271 void dump(Formatter *f) const {
272 const char *type_str = es_type_to_str(estype);
273 encode_json("type", type_str, f);
274 if (format) {
275 encode_json("format", format, f);
276 }
277
278 auto is_analyzed = analyzed;
279
280 if (estype == ESType::String &&
281 !is_analyzed) {
282 is_analyzed = false;
283 }
284
285 if (is_analyzed) {
286 encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f);
287 }
288 }
289};
31f18b77 290
a8e16298
TL
291struct es_type_v5 {
292 ESType estype;
293 const char *format{nullptr};
11fdf7f2
TL
294 std::optional<bool> analyzed;
295 std::optional<bool> index;
a8e16298
TL
296
297 es_type_v5(ESType et) : estype(et) {}
31f18b77
FG
298
299 void dump(Formatter *f) const {
a8e16298
TL
300 ESType new_estype;
301 if (estype != ESType::String) {
302 new_estype = estype;
303 } else {
304 bool is_analyzed = analyzed.value_or(false);
305 new_estype = (is_analyzed ? ESType::Text : ESType::Keyword);
306 /* index = true; ... Not setting index=true, because that's the default,
307 * and dumping a boolean value *might* be a problem when backporting this
308 * because value might get quoted
309 */
310 }
311
312 const char *type_str = es_type_to_str(new_estype);
313 encode_json("type", type_str, f);
31f18b77
FG
314 if (format) {
315 encode_json("format", format, f);
316 }
a8e16298
TL
317 if (index) {
318 encode_json("index", index.value(), f);
31f18b77
FG
319 }
320 }
321};
322
a8e16298
TL
323template <class T>
324struct es_type : public T {
325 es_type(T t) : T(t) {}
326 es_type& set_format(const char *f) {
327 T::format = f;
328 return *this;
329 }
330
331 es_type& set_analyzed(bool a) {
332 T::analyzed = a;
333 return *this;
334 }
335};
336
337template <class T>
31f18b77 338struct es_index_mappings {
eafe8130 339 ESVersion es_version;
a8e16298
TL
340 ESType string_type {ESType::String};
341
eafe8130
TL
342 es_index_mappings(ESVersion esv):es_version(esv) {
343 }
344
a8e16298
TL
345 es_type<T> est(ESType t) const {
346 return es_type<T>(t);
347 }
348
349 void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const {
31f18b77
FG
350 f->open_object_section(section);
351 ::encode_json("type", "nested", f);
352 f->open_object_section("properties");
a8e16298
TL
353 encode_json("name", est(string_type), f);
354 encode_json("value", est(type).set_format(format), f);
31f18b77
FG
355 f->close_section(); // entry
356 f->close_section(); // custom-string
357 }
a8e16298 358
31f18b77 359 void dump(Formatter *f) const {
eafe8130
TL
360 if (es_version <= ES_V7)
361 f->open_object_section("object");
31f18b77 362 f->open_object_section("properties");
a8e16298
TL
363 encode_json("bucket", est(string_type), f);
364 encode_json("name", est(string_type), f);
365 encode_json("instance", est(string_type), f);
366 encode_json("versioned_epoch", est(ESType::Long), f);
31f18b77
FG
367 f->open_object_section("meta");
368 f->open_object_section("properties");
a8e16298
TL
369 encode_json("cache_control", est(string_type), f);
370 encode_json("content_disposition", est(string_type), f);
371 encode_json("content_encoding", est(string_type), f);
372 encode_json("content_language", est(string_type), f);
373 encode_json("content_type", est(string_type), f);
374 encode_json("storage_class", est(string_type), f);
375 encode_json("etag", est(string_type), f);
376 encode_json("expires", est(string_type), f);
377 encode_json("mtime", est(ESType::Date)
378 .set_format("strict_date_optional_time||epoch_millis"), f);
379 encode_json("size", est(ESType::Long), f);
380 dump_custom("custom-string", string_type, nullptr, f);
381 dump_custom("custom-int", ESType::Long, nullptr, f);
382 dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f);
31f18b77
FG
383 f->close_section(); // properties
384 f->close_section(); // meta
385 f->close_section(); // properties
eafe8130
TL
386
387 if (es_version <= ES_V7)
31f18b77
FG
388 f->close_section(); // object
389 }
390};
391
392struct es_index_settings {
393 uint32_t num_replicas;
394 uint32_t num_shards;
395
396 es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
397
398 void dump(Formatter *f) const {
399 encode_json("number_of_replicas", num_replicas, f);
400 encode_json("number_of_shards", num_shards, f);
401 }
402};
403
a8e16298
TL
404struct es_index_config_base {
405 virtual ~es_index_config_base() {}
406 virtual void dump(Formatter *f) const = 0;
407};
408
409template <class T>
410struct es_index_config : public es_index_config_base {
31f18b77 411 es_index_settings settings;
a8e16298 412 es_index_mappings<T> mappings;
31f18b77 413
eafe8130
TL
414 es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
415 }
31f18b77
FG
416
417 void dump(Formatter *f) const {
418 encode_json("settings", settings, f);
419 encode_json("mappings", mappings, f);
420 }
421};
7c673cae 422
f64942e4 423static bool is_sys_attr(const std::string& attr_name){
11fdf7f2
TL
424 static constexpr std::initializer_list<const char*> rgw_sys_attrs =
425 {RGW_ATTR_PG_VER,
426 RGW_ATTR_SOURCE_ZONE,
427 RGW_ATTR_ID_TAG,
428 RGW_ATTR_TEMPURL_KEY1,
429 RGW_ATTR_TEMPURL_KEY2,
430 RGW_ATTR_UNIX1,
431 RGW_ATTR_UNIX_KEY1
f64942e4
AA
432 };
433
434 return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end();
435}
436
a8e16298
TL
437static size_t attr_len(const bufferlist& val)
438{
439 size_t len = val.length();
440 if (len && val[len - 1] == '\0') {
441 --len;
442 }
443
444 return len;
445}
446
7c673cae 447struct es_obj_metadata {
20effc67 448 const DoutPrefixProvider *dpp;
7c673cae 449 CephContext *cct;
31f18b77 450 ElasticConfigRef es_conf;
7c673cae
FG
451 RGWBucketInfo bucket_info;
452 rgw_obj_key key;
453 ceph::real_time mtime;
454 uint64_t size;
455 map<string, bufferlist> attrs;
31f18b77 456 uint64_t versioned_epoch;
7c673cae 457
31f18b77 458 es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
7c673cae 459 const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
31f18b77
FG
460 map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
461 mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
7c673cae
FG
462
463 void dump(Formatter *f) const {
464 map<string, string> out_attrs;
465 map<string, string> custom_meta;
466 RGWAccessControlPolicy policy;
467 set<string> permissions;
224ce89b 468 RGWObjTags obj_tags;
7c673cae
FG
469
470 for (auto i : attrs) {
471 const string& attr_name = i.first;
7c673cae
FG
472 bufferlist& val = i.second;
473
a8e16298 474 if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) {
7c673cae
FG
475 continue;
476 }
477
a8e16298 478 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) {
f64942e4 479 custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1),
a8e16298
TL
480 string(val.c_str(), attr_len(val)));
481 continue;
482 }
483
484 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) {
7c673cae
FG
485 continue;
486 }
487
a8e16298
TL
488 if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) {
489 // skip versioned object olh info
f64942e4
AA
490 continue;
491 }
7c673cae 492
f64942e4 493 if (attr_name == RGW_ATTR_ACL) {
7c673cae 494 try {
11fdf7f2
TL
495 auto i = val.cbegin();
496 decode(policy, i);
7c673cae 497 } catch (buffer::error& err) {
20effc67 498 ldpp_dout(dpp, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
f64942e4 499 continue;
7c673cae
FG
500 }
501
502 const RGWAccessControlList& acl = policy.get_acl();
503
504 permissions.insert(policy.get_owner().get_id().to_str());
505 for (auto acliter : acl.get_grant_map()) {
506 const ACLGrant& grant = acliter.second;
507 if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
508 ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
509 rgw_user user;
510 if (grant.get_id(user)) {
511 permissions.insert(user.to_str());
512 }
513 }
514 }
f64942e4
AA
515 } else if (attr_name == RGW_ATTR_TAGS) {
516 try {
11fdf7f2
TL
517 auto tags_bl = val.cbegin();
518 decode(obj_tags, tags_bl);
f64942e4 519 } catch (buffer::error& err) {
20effc67 520 ldpp_dout(dpp, 0) << "ERROR: failed to decode obj tags for "
f64942e4
AA
521 << bucket_info.bucket << "/" << key << dendl;
522 continue;
523 }
524 } else if (attr_name == RGW_ATTR_COMPRESSION) {
28e407b8 525 RGWCompressionInfo cs_info;
f64942e4 526 try {
11fdf7f2
TL
527 auto vals_bl = val.cbegin();
528 decode(cs_info, vals_bl);
f64942e4 529 } catch (buffer::error& err) {
20effc67 530 ldpp_dout(dpp, 0) << "ERROR: failed to decode compression attr for "
f64942e4
AA
531 << bucket_info.bucket << "/" << key << dendl;
532 continue;
533 }
534 out_attrs.emplace("compression",std::move(cs_info.compression_type));
7c673cae 535 } else {
f64942e4
AA
536 if (!is_sys_attr(attr_name)) {
537 out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1),
a8e16298 538 std::string(val.c_str(), attr_len(val)));
7c673cae
FG
539 }
540 }
541 }
542 ::encode_json("bucket", bucket_info.bucket.name, f);
543 ::encode_json("name", key.name, f);
a8e16298
TL
544 string instance = key.instance;
545 if (instance.empty())
546 instance = "null";
547 ::encode_json("instance", instance, f);
31f18b77 548 ::encode_json("versioned_epoch", versioned_epoch, f);
7c673cae
FG
549 ::encode_json("owner", policy.get_owner(), f);
550 ::encode_json("permissions", permissions, f);
551 f->open_object_section("meta");
552 ::encode_json("size", size, f);
553
554 string mtime_str;
555 rgw_to_iso8601(mtime, &mtime_str);
556 ::encode_json("mtime", mtime_str, f);
557 for (auto i : out_attrs) {
558 ::encode_json(i.first.c_str(), i.second, f);
559 }
31f18b77
FG
560 map<string, string> custom_str;
561 map<string, string> custom_int;
562 map<string, string> custom_date;
563
564 for (auto i : custom_meta) {
565 auto config = bucket_info.mdsearch_config.find(i.first);
566 if (config == bucket_info.mdsearch_config.end()) {
567 if (!es_conf->explicit_custom_meta) {
568 /* default custom meta is of type string */
569 custom_str[i.first] = i.second;
570 } else {
20effc67 571 ldpp_dout(dpp, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
31f18b77
FG
572 }
573 continue;
574 }
575 switch (config->second) {
576 case ESEntityTypeMap::ES_ENTITY_DATE:
577 custom_date[i.first] = i.second;
578 break;
579 case ESEntityTypeMap::ES_ENTITY_INT:
580 custom_int[i.first] = i.second;
581 break;
582 default:
583 custom_str[i.first] = i.second;
584 }
585 }
586
587 if (!custom_str.empty()) {
588 f->open_array_section("custom-string");
589 for (auto i : custom_str) {
590 f->open_object_section("entity");
591 ::encode_json("name", i.first.c_str(), f);
592 ::encode_json("value", i.second, f);
593 f->close_section();
594 }
595 f->close_section();
596 }
597 if (!custom_int.empty()) {
598 f->open_array_section("custom-int");
599 for (auto i : custom_int) {
600 f->open_object_section("entity");
601 ::encode_json("name", i.first.c_str(), f);
602 ::encode_json("value", i.second, f);
603 f->close_section();
604 }
605 f->close_section();
606 }
607 if (!custom_date.empty()) {
608 f->open_array_section("custom-date");
609 for (auto i : custom_date) {
610 /*
611 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
612 * which will end up with failed sync
613 */
614 real_time t;
615 int r = parse_time(i.second.c_str(), &t);
616 if (r < 0) {
20effc67 617 ldpp_dout(dpp, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
31f18b77
FG
618 continue;
619 }
620
621 string time_str;
622 rgw_to_iso8601(t, &time_str);
623
624 f->open_object_section("entity");
625 ::encode_json("name", i.first.c_str(), f);
626 ::encode_json("value", time_str.c_str(), f);
627 f->close_section();
7c673cae
FG
628 }
629 f->close_section();
630 }
224ce89b
WB
631 f->close_section(); // meta
632 const auto& m = obj_tags.get_tags();
633 if (m.size() > 0){
634 f->open_array_section("tagging");
635 for (const auto &it : m) {
636 f->open_object_section("tag");
637 ::encode_json("key", it.first, f);
638 ::encode_json("value",it.second, f);
639 f->close_section();
640 }
641 f->close_section(); // tagging
642 }
7c673cae 643 }
31f18b77
FG
644};
645
9f95a23c 646class RGWElasticGetESInfoCBCR : public RGWCoroutine {
31f18b77 647public:
9f95a23c
TL
648 RGWElasticGetESInfoCBCR(RGWDataSyncCtx *_sc,
649 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
650 sc(_sc), sync_env(_sc->env),
31f18b77 651 conf(_conf) {}
b3b6e05e 652 int operate(const DoutPrefixProvider *dpp) override {
31f18b77 653 reenter(this) {
b3b6e05e 654 ldpp_dout(dpp, 5) << conf->id << ": get elasticsearch info for zone: " << sc->source_zone << dendl;
a8e16298
TL
655 yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
656 conf->conn.get(),
657 sync_env->http_manager,
658 "/", nullptr /*params*/,
659 &(conf->default_headers),
9f95a23c 660 &(conf->es_info)));
a8e16298 661 if (retcode < 0) {
b3b6e05e 662 ldpp_dout(dpp, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
a8e16298
TL
663 return set_cr_error(retcode);
664 }
665
b3b6e05e 666 ldpp_dout(dpp, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
9f95a23c
TL
667 return set_cr_done();
668 }
669 return 0;
670 }
671private:
672 RGWDataSyncCtx *sc;
673 RGWDataSyncEnv *sync_env;
674 ElasticConfigRef conf;
675};
676
677class RGWElasticPutIndexCBCR : public RGWCoroutine {
678public:
679 RGWElasticPutIndexCBCR(RGWDataSyncCtx *_sc,
680 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
681 sc(_sc), sync_env(_sc->env),
682 conf(_conf) {}
b3b6e05e 683 int operate(const DoutPrefixProvider *dpp) override {
9f95a23c 684 reenter(this) {
b3b6e05e 685 ldpp_dout(dpp, 5) << conf->id << ": put elasticsearch index for zone: " << sc->source_zone << dendl;
9f95a23c 686
31f18b77
FG
687 yield {
688 string path = conf->get_index_path();
31f18b77 689 es_index_settings settings(conf->num_replicas, conf->num_shards);
a8e16298 690 std::unique_ptr<es_index_config_base> index_conf;
31f18b77 691
9f95a23c 692 if (conf->es_info.version >= ES_V5) {
b3b6e05e 693 ldpp_dout(dpp, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
9f95a23c 694 index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
a8e16298 695 } else {
b3b6e05e 696 ldpp_dout(dpp, 0) << "elasticsearch: index mapping: version < 5" << dendl;
9f95a23c 697 index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
a8e16298 698 }
9f95a23c 699 call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sc->cct,
a8e16298
TL
700 conf->conn.get(),
701 sync_env->http_manager,
702 path, nullptr /*params*/,
703 &(conf->default_headers),
704 *index_conf, nullptr, &err_response));
31f18b77
FG
705 }
706 if (retcode < 0) {
a8e16298 707
eafe8130 708 if (err_response.error.type != "index_already_exists_exception" &&
9f95a23c 709 err_response.error.type != "resource_already_exists_exception") {
b3b6e05e 710 ldpp_dout(dpp, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
a8e16298
TL
711 return set_cr_error(retcode);
712 }
713
b3b6e05e 714 ldpp_dout(dpp, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl;
31f18b77
FG
715 }
716 return set_cr_done();
717 }
718 return 0;
719 }
7c673cae 720
9f95a23c
TL
721private:
722 RGWDataSyncCtx *sc;
723 RGWDataSyncEnv *sync_env;
724 ElasticConfigRef conf;
725
726 struct _err_response {
727 struct err_reason {
728 vector<err_reason> root_cause;
729 string type;
730 string reason;
731 string index;
732
733 void decode_json(JSONObj *obj) {
734 JSONDecoder::decode_json("root_cause", root_cause, obj);
735 JSONDecoder::decode_json("type", type, obj);
736 JSONDecoder::decode_json("reason", reason, obj);
737 JSONDecoder::decode_json("index", index, obj);
738 }
739 } error;
740
741 void decode_json(JSONObj *obj) {
742 JSONDecoder::decode_json("error", error, obj);
743 }
744 } err_response;
745};
746
747class RGWElasticInitConfigCBCR : public RGWCoroutine {
748 RGWDataSyncCtx *sc;
749 RGWDataSyncEnv *sync_env;
750 ElasticConfigRef conf;
751
752public:
753 RGWElasticInitConfigCBCR(RGWDataSyncCtx *_sc,
754 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
755 sc(_sc), sync_env(_sc->env),
756 conf(_conf) {}
b3b6e05e 757 int operate(const DoutPrefixProvider *dpp) override {
9f95a23c
TL
758 reenter(this) {
759
760 yield call(new RGWElasticGetESInfoCBCR(sc, conf));
761
762 if (retcode < 0) {
763 return set_cr_error(retcode);
764 }
765
766 yield call(new RGWElasticPutIndexCBCR(sc, conf));
767 if (retcode < 0) {
768 return set_cr_error(retcode);
769 }
770 return set_cr_done();
771 }
772 return 0;
773 }
774
7c673cae
FG
775};
776
777class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
9f95a23c 778 rgw_bucket_sync_pipe sync_pipe;
31f18b77
FG
779 ElasticConfigRef conf;
780 uint64_t versioned_epoch;
7c673cae 781public:
9f95a23c
TL
782 RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
783 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
784 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
785 sync_pipe(_sync_pipe), conf(_conf),
31f18b77 786 versioned_epoch(_versioned_epoch) {}
b3b6e05e 787 int operate(const DoutPrefixProvider *dpp) override {
7c673cae 788 reenter(this) {
b3b6e05e 789 ldpp_dout(dpp, 10) << ": stat of remote obj: z=" << sc->source_zone
9f95a23c 790 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key
a8e16298
TL
791 << " size=" << size << " mtime=" << mtime << dendl;
792
7c673cae 793 yield {
9f95a23c
TL
794 string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
795 es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch);
7c673cae 796
31f18b77 797 call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
7c673cae
FG
798 sync_env->http_manager,
799 path, nullptr /* params */,
a8e16298 800 &(conf->default_headers),
7c673cae
FG
801 doc, nullptr /* result */));
802
803 }
804 if (retcode < 0) {
805 return set_cr_error(retcode);
806 }
807 return set_cr_done();
808 }
809 return 0;
810 }
7c673cae
FG
811};
812
813class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
9f95a23c 814 rgw_bucket_sync_pipe sync_pipe;
31f18b77
FG
815 ElasticConfigRef conf;
816 uint64_t versioned_epoch;
7c673cae 817public:
9f95a23c
TL
818 RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc,
819 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
820 ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
821 sync_pipe(_sync_pipe),
31f18b77 822 conf(_conf), versioned_epoch(_versioned_epoch) {
7c673cae
FG
823 }
824
825 ~RGWElasticHandleRemoteObjCR() override {}
826
827 RGWStatRemoteObjCBCR *allocate_callback() override {
9f95a23c 828 return new RGWElasticHandleRemoteObjCBCR(sc, sync_pipe, key, conf, versioned_epoch);
7c673cae
FG
829 }
830};
831
832class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
9f95a23c 833 RGWDataSyncCtx *sc;
7c673cae 834 RGWDataSyncEnv *sync_env;
9f95a23c 835 rgw_bucket_sync_pipe sync_pipe;
7c673cae
FG
836 rgw_obj_key key;
837 ceph::real_time mtime;
31f18b77 838 ElasticConfigRef conf;
7c673cae 839public:
9f95a23c
TL
840 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
841 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
842 ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
843 sync_pipe(_sync_pipe), key(_key),
7c673cae 844 mtime(_mtime), conf(_conf) {}
b3b6e05e 845 int operate(const DoutPrefixProvider *dpp) override {
7c673cae 846 reenter(this) {
b3b6e05e 847 ldpp_dout(dpp, 10) << ": remove remote obj: z=" << sc->source_zone
9f95a23c 848 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
7c673cae 849 yield {
9f95a23c 850 string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
7c673cae 851
31f18b77 852 call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
7c673cae
FG
853 sync_env->http_manager,
854 path, nullptr /* params */));
855 }
856 if (retcode < 0) {
857 return set_cr_error(retcode);
858 }
859 return set_cr_done();
860 }
861 return 0;
862 }
863
864};
865
866class RGWElasticDataSyncModule : public RGWDataSyncModule {
31f18b77 867 ElasticConfigRef conf;
7c673cae 868public:
20effc67 869 RGWElasticDataSyncModule(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) {
31f18b77 870 conf->init(cct, config);
7c673cae 871 }
31f18b77
FG
872 ~RGWElasticDataSyncModule() override {}
873
9f95a23c
TL
874 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
875 conf->init_instance(sc->env->svc->zone->get_realm(), instance_id);
876 }
877
20effc67
TL
878 RGWCoroutine *init_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
879 ldpp_dout(dpp, 5) << conf->id << ": init" << dendl;
9f95a23c 880 return new RGWElasticInitConfigCBCR(sc, conf);
7c673cae
FG
881 }
882
20effc67
TL
883 RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
884 ldpp_dout(dpp, 5) << conf->id << ": start_sync" << dendl;
9f95a23c
TL
885 // try to get elastic search version
886 return new RGWElasticGetESInfoCBCR(sc, conf);
31f18b77 887 }
9f95a23c 888
20effc67
TL
889 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
890 ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
9f95a23c 891 if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
20effc67 892 ldpp_dout(dpp, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
31f18b77
FG
893 return nullptr;
894 }
9f95a23c 895 return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0));
7c673cae 896 }
20effc67 897 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
7c673cae 898 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
20effc67 899 ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
9f95a23c 900 if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
20effc67 901 ldpp_dout(dpp, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
31f18b77
FG
902 return nullptr;
903 }
9f95a23c 904 return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf);
7c673cae 905 }
20effc67 906 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
31f18b77 907 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
20effc67 908 ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
7c673cae 909 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
20effc67 910 ldpp_dout(dpp, 10) << conf->id << ": skipping operation (not handled)" << dendl;
7c673cae
FG
911 return NULL;
912 }
31f18b77
FG
913 RGWRESTConn *get_rest_conn() {
914 return conf->conn.get();
915 }
7c673cae 916
31f18b77
FG
917 string get_index_path() {
918 return conf->get_index_path();
7c673cae 919 }
a8e16298
TL
920
921 map<string, string>& get_request_headers() {
922 return conf->get_request_headers();
923 }
7c673cae
FG
924};
925
20effc67 926RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config)
31f18b77 927{
20effc67 928 data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(dpp, cct, config));
31f18b77
FG
929}
930
931RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
932{
933 return data_handler.get();
934}
935
936RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
937{
938 return data_handler->get_rest_conn();
939}
940
941string RGWElasticSyncModuleInstance::get_index_path() {
942 return data_handler->get_index_path();
943}
944
a8e16298
TL
945map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() {
946 return data_handler->get_request_headers();
947}
948
31f18b77
FG
949RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
950 if (dialect != RGW_REST_S3) {
951 return orig;
952 }
953 delete orig;
954 return new RGWRESTMgr_MDSearch_S3();
955}
956
20effc67 957int RGWElasticSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
11fdf7f2 958 string endpoint = config["endpoint"];
20effc67 959 instance->reset(new RGWElasticSyncModuleInstance(dpp, cct, config));
7c673cae
FG
960 return 0;
961}
962