]>
Commit | Line | Data |
---|---|---|
a8e16298 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_b64.h" | |
7c673cae FG |
5 | #include "rgw_common.h" |
6 | #include "rgw_coroutine.h" | |
7 | #include "rgw_sync_module.h" | |
8 | #include "rgw_data_sync.h" | |
7c673cae | 9 | #include "rgw_sync_module_es.h" |
31f18b77 | 10 | #include "rgw_sync_module_es_rest.h" |
7c673cae FG |
11 | #include "rgw_rest_conn.h" |
12 | #include "rgw_cr_rest.h" | |
31f18b77 FG |
13 | #include "rgw_op.h" |
14 | #include "rgw_es_query.h" | |
11fdf7f2 TL |
15 | #include "rgw_zone.h" |
16 | ||
17 | #include "services/svc_zone.h" | |
31f18b77 FG |
18 | |
19 | #include "include/str_list.h" | |
20 | ||
21 | #include <boost/asio/yield.hpp> | |
7c673cae FG |
22 | |
23 | #define dout_subsys ceph_subsys_rgw | |
24 | ||
31f18b77 FG |
25 | |
26 | /* | |
27 | * whitelist utility. Config string is a list of entries, where an entry is either an item, | |
28 | * a prefix, or a suffix. An item would be the name of the entity that we'd look up, | |
29 | * a prefix would be a string ending with an asterisk, a suffix would be a string starting | |
30 | * with an asterisk. For example: | |
31 | * | |
32 | * bucket1, bucket2, foo*, *bar | |
33 | */ | |
34 | class ItemList { | |
35 | bool approve_all{false}; | |
36 | ||
37 | set<string> entries; | |
38 | set<string> prefixes; | |
39 | set<string> suffixes; | |
40 | ||
41 | void parse(const string& str) { | |
42 | list<string> l; | |
43 | ||
44 | get_str_list(str, ",", l); | |
45 | ||
46 | for (auto& entry : l) { | |
47 | entry = rgw_trim_whitespace(entry); | |
48 | if (entry.empty()) { | |
49 | continue; | |
50 | } | |
51 | ||
52 | if (entry == "*") { | |
53 | approve_all = true; | |
54 | return; | |
55 | } | |
56 | ||
57 | if (entry[0] == '*') { | |
58 | suffixes.insert(entry.substr(1)); | |
59 | continue; | |
60 | } | |
61 | ||
62 | if (entry.back() == '*') { | |
63 | prefixes.insert(entry.substr(0, entry.size() - 1)); | |
64 | continue; | |
65 | } | |
66 | ||
67 | entries.insert(entry); | |
68 | } | |
69 | } | |
70 | ||
71 | public: | |
72 | ItemList() {} | |
73 | void init(const string& str, bool def_val) { | |
74 | if (str.empty()) { | |
75 | approve_all = def_val; | |
76 | } else { | |
77 | parse(str); | |
78 | } | |
79 | } | |
80 | ||
81 | bool exists(const string& entry) { | |
82 | if (approve_all) { | |
83 | return true; | |
84 | } | |
85 | ||
86 | if (entries.find(entry) != entries.end()) { | |
87 | return true; | |
88 | } | |
89 | ||
90 | auto i = prefixes.upper_bound(entry); | |
91 | if (i != prefixes.begin()) { | |
92 | --i; | |
93 | if (boost::algorithm::starts_with(entry, *i)) { | |
94 | return true; | |
95 | } | |
96 | } | |
97 | ||
98 | for (i = suffixes.begin(); i != suffixes.end(); ++i) { | |
99 | if (boost::algorithm::ends_with(entry, *i)) { | |
100 | return true; | |
101 | } | |
102 | } | |
103 | ||
104 | return false; | |
105 | } | |
106 | }; | |
107 | ||
108 | #define ES_NUM_SHARDS_MIN 5 | |
109 | ||
110 | #define ES_NUM_SHARDS_DEFAULT 16 | |
111 | #define ES_NUM_REPLICAS_DEFAULT 1 | |
112 | ||
a8e16298 TL |
113 | using ESVersion = std::pair<int,int>; |
114 | static constexpr ESVersion ES_V5{5,0}; | |
eafe8130 | 115 | static constexpr ESVersion ES_V7{7,0}; |
a8e16298 TL |
116 | |
117 | struct ESInfo { | |
118 | std::string name; | |
119 | std::string cluster_name; | |
120 | std::string cluster_uuid; | |
121 | ESVersion version; | |
122 | ||
123 | void decode_json(JSONObj *obj); | |
124 | ||
125 | std::string get_version_str(){ | |
126 | return std::to_string(version.first) + "." + std::to_string(version.second); | |
127 | } | |
128 | }; | |
129 | ||
130 | // simple wrapper structure to wrap the es version nested type | |
131 | struct es_version_decoder { | |
132 | ESVersion version; | |
133 | ||
134 | int parse_version(const std::string& s) { | |
135 | int major, minor; | |
136 | int ret = sscanf(s.c_str(), "%d.%d", &major, &minor); | |
137 | if (ret < 0) { | |
138 | return ret; | |
139 | } | |
140 | version = std::make_pair(major,minor); | |
141 | return 0; | |
142 | } | |
143 | ||
144 | void decode_json(JSONObj *obj) { | |
145 | std::string s; | |
146 | JSONDecoder::decode_json("number",s,obj); | |
147 | if (parse_version(s) < 0) | |
148 | throw JSONDecoder::err("Failed to parse ElasticVersion"); | |
149 | } | |
150 | }; | |
151 | ||
152 | ||
153 | void ESInfo::decode_json(JSONObj *obj) | |
154 | { | |
155 | JSONDecoder::decode_json("name", name, obj); | |
156 | JSONDecoder::decode_json("cluster_name", cluster_name, obj); | |
157 | JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj); | |
158 | es_version_decoder esv; | |
159 | JSONDecoder::decode_json("version", esv, obj); | |
160 | version = std::move(esv.version); | |
161 | } | |
162 | ||
7c673cae | 163 | struct ElasticConfig { |
31f18b77 | 164 | uint64_t sync_instance{0}; |
7c673cae | 165 | string id; |
31f18b77 FG |
166 | string index_path; |
167 | std::unique_ptr<RGWRESTConn> conn; | |
168 | bool explicit_custom_meta{true}; | |
169 | string override_index_path; | |
170 | ItemList index_buckets; | |
171 | ItemList allow_owners; | |
172 | uint32_t num_shards{0}; | |
173 | uint32_t num_replicas{0}; | |
a8e16298 | 174 | std::map <string,string> default_headers = {{ "Content-Type", "application/json" }}; |
eafe8130 | 175 | ESInfo es_info; |
31f18b77 | 176 | |
11fdf7f2 TL |
177 | void init(CephContext *cct, const JSONFormattable& config) { |
178 | string elastic_endpoint = config["endpoint"]; | |
31f18b77 FG |
179 | id = string("elastic:") + elastic_endpoint; |
180 | conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint })); | |
11fdf7f2 TL |
181 | explicit_custom_meta = config["explicit_custom_meta"](true); |
182 | index_buckets.init(config["index_buckets_list"], true); /* approve all buckets by default */ | |
183 | allow_owners.init(config["approved_owners_list"], true); /* approve all bucket owners by default */ | |
184 | override_index_path = config["override_index_path"]; | |
185 | num_shards = config["num_shards"](ES_NUM_SHARDS_DEFAULT); | |
31f18b77 FG |
186 | if (num_shards < ES_NUM_SHARDS_MIN) { |
187 | num_shards = ES_NUM_SHARDS_MIN; | |
188 | } | |
11fdf7f2 TL |
189 | num_replicas = config["num_replicas"](ES_NUM_REPLICAS_DEFAULT); |
190 | if (string user = config["username"], pw = config["password"]; | |
191 | !user.empty() && !pw.empty()) { | |
a8e16298 TL |
192 | auto auth_string = user + ":" + pw; |
193 | default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string)); | |
194 | } | |
11fdf7f2 | 195 | |
31f18b77 FG |
196 | } |
197 | ||
11fdf7f2 | 198 | void init_instance(const RGWRealm& realm, uint64_t instance_id) { |
31f18b77 FG |
199 | sync_instance = instance_id; |
200 | ||
201 | if (!override_index_path.empty()) { | |
202 | index_path = override_index_path; | |
203 | return; | |
204 | } | |
205 | ||
206 | char buf[32]; | |
207 | snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF)); | |
208 | ||
209 | index_path = "/rgw-" + realm.get_name() + buf; | |
210 | } | |
211 | ||
212 | string get_index_path() { | |
213 | return index_path; | |
214 | } | |
215 | ||
a8e16298 TL |
216 | map<string, string>& get_request_headers() { |
217 | return default_headers; | |
218 | } | |
219 | ||
31f18b77 | 220 | string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { |
eafe8130 TL |
221 | if (es_info.version >= ES_V7) { |
222 | return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); | |
223 | ; | |
224 | } else { | |
225 | return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); | |
226 | } | |
31f18b77 FG |
227 | } |
228 | ||
229 | bool should_handle_operation(RGWBucketInfo& bucket_info) { | |
230 | return index_buckets.exists(bucket_info.bucket.name) && | |
231 | allow_owners.exists(bucket_info.owner.to_str()); | |
232 | } | |
7c673cae FG |
233 | }; |
234 | ||
31f18b77 FG |
235 | using ElasticConfigRef = std::shared_ptr<ElasticConfig>; |
236 | ||
a8e16298 TL |
237 | static const char *es_type_to_str(const ESType& t) { |
238 | switch (t) { | |
239 | case ESType::String: return "string"; | |
240 | case ESType::Text: return "text"; | |
241 | case ESType::Keyword: return "keyword"; | |
242 | case ESType::Long: return "long"; | |
243 | case ESType::Integer: return "integer"; | |
244 | case ESType::Short: return "short"; | |
245 | case ESType::Byte: return "byte"; | |
246 | case ESType::Double: return "double"; | |
247 | case ESType::Float: return "float"; | |
248 | case ESType::Half_Float: return "half_float"; | |
249 | case ESType::Scaled_Float: return "scaled_float"; | |
250 | case ESType::Date: return "date"; | |
251 | case ESType::Boolean: return "boolean"; | |
252 | case ESType::Integer_Range: return "integer_range"; | |
253 | case ESType::Float_Range: return "float_range"; | |
254 | case ESType::Double_Range: return "date_range"; | |
255 | case ESType::Date_Range: return "date_range"; | |
256 | case ESType::Geo_Point: return "geo_point"; | |
257 | case ESType::Ip: return "ip"; | |
258 | default: | |
259 | return "<unknown>"; | |
260 | } | |
261 | } | |
262 | ||
263 | struct es_type_v2 { | |
264 | ESType estype; | |
265 | const char *format{nullptr}; | |
11fdf7f2 | 266 | std::optional<bool> analyzed; |
a8e16298 TL |
267 | |
268 | es_type_v2(ESType et) : estype(et) {} | |
269 | ||
270 | void dump(Formatter *f) const { | |
271 | const char *type_str = es_type_to_str(estype); | |
272 | encode_json("type", type_str, f); | |
273 | if (format) { | |
274 | encode_json("format", format, f); | |
275 | } | |
276 | ||
277 | auto is_analyzed = analyzed; | |
278 | ||
279 | if (estype == ESType::String && | |
280 | !is_analyzed) { | |
281 | is_analyzed = false; | |
282 | } | |
283 | ||
284 | if (is_analyzed) { | |
285 | encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f); | |
286 | } | |
287 | } | |
288 | }; | |
31f18b77 | 289 | |
a8e16298 TL |
290 | struct es_type_v5 { |
291 | ESType estype; | |
292 | const char *format{nullptr}; | |
11fdf7f2 TL |
293 | std::optional<bool> analyzed; |
294 | std::optional<bool> index; | |
a8e16298 TL |
295 | |
296 | es_type_v5(ESType et) : estype(et) {} | |
31f18b77 FG |
297 | |
298 | void dump(Formatter *f) const { | |
a8e16298 TL |
299 | ESType new_estype; |
300 | if (estype != ESType::String) { | |
301 | new_estype = estype; | |
302 | } else { | |
303 | bool is_analyzed = analyzed.value_or(false); | |
304 | new_estype = (is_analyzed ? ESType::Text : ESType::Keyword); | |
305 | /* index = true; ... Not setting index=true, because that's the default, | |
306 | * and dumping a boolean value *might* be a problem when backporting this | |
307 | * because value might get quoted | |
308 | */ | |
309 | } | |
310 | ||
311 | const char *type_str = es_type_to_str(new_estype); | |
312 | encode_json("type", type_str, f); | |
31f18b77 FG |
313 | if (format) { |
314 | encode_json("format", format, f); | |
315 | } | |
a8e16298 TL |
316 | if (index) { |
317 | encode_json("index", index.value(), f); | |
31f18b77 FG |
318 | } |
319 | } | |
320 | }; | |
321 | ||
a8e16298 TL |
322 | template <class T> |
323 | struct es_type : public T { | |
324 | es_type(T t) : T(t) {} | |
325 | es_type& set_format(const char *f) { | |
326 | T::format = f; | |
327 | return *this; | |
328 | } | |
329 | ||
330 | es_type& set_analyzed(bool a) { | |
331 | T::analyzed = a; | |
332 | return *this; | |
333 | } | |
334 | }; | |
335 | ||
336 | template <class T> | |
31f18b77 | 337 | struct es_index_mappings { |
eafe8130 | 338 | ESVersion es_version; |
a8e16298 TL |
339 | ESType string_type {ESType::String}; |
340 | ||
eafe8130 TL |
341 | es_index_mappings(ESVersion esv):es_version(esv) { |
342 | } | |
343 | ||
a8e16298 TL |
344 | es_type<T> est(ESType t) const { |
345 | return es_type<T>(t); | |
346 | } | |
347 | ||
348 | void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const { | |
31f18b77 FG |
349 | f->open_object_section(section); |
350 | ::encode_json("type", "nested", f); | |
351 | f->open_object_section("properties"); | |
a8e16298 TL |
352 | encode_json("name", est(string_type), f); |
353 | encode_json("value", est(type).set_format(format), f); | |
31f18b77 FG |
354 | f->close_section(); // entry |
355 | f->close_section(); // custom-string | |
356 | } | |
a8e16298 | 357 | |
31f18b77 | 358 | void dump(Formatter *f) const { |
eafe8130 TL |
359 | if (es_version <= ES_V7) |
360 | f->open_object_section("object"); | |
31f18b77 | 361 | f->open_object_section("properties"); |
a8e16298 TL |
362 | encode_json("bucket", est(string_type), f); |
363 | encode_json("name", est(string_type), f); | |
364 | encode_json("instance", est(string_type), f); | |
365 | encode_json("versioned_epoch", est(ESType::Long), f); | |
31f18b77 FG |
366 | f->open_object_section("meta"); |
367 | f->open_object_section("properties"); | |
a8e16298 TL |
368 | encode_json("cache_control", est(string_type), f); |
369 | encode_json("content_disposition", est(string_type), f); | |
370 | encode_json("content_encoding", est(string_type), f); | |
371 | encode_json("content_language", est(string_type), f); | |
372 | encode_json("content_type", est(string_type), f); | |
373 | encode_json("storage_class", est(string_type), f); | |
374 | encode_json("etag", est(string_type), f); | |
375 | encode_json("expires", est(string_type), f); | |
376 | encode_json("mtime", est(ESType::Date) | |
377 | .set_format("strict_date_optional_time||epoch_millis"), f); | |
378 | encode_json("size", est(ESType::Long), f); | |
379 | dump_custom("custom-string", string_type, nullptr, f); | |
380 | dump_custom("custom-int", ESType::Long, nullptr, f); | |
381 | dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f); | |
31f18b77 FG |
382 | f->close_section(); // properties |
383 | f->close_section(); // meta | |
384 | f->close_section(); // properties | |
eafe8130 TL |
385 | |
386 | if (es_version <= ES_V7) | |
31f18b77 FG |
387 | f->close_section(); // object |
388 | } | |
389 | }; | |
390 | ||
391 | struct es_index_settings { | |
392 | uint32_t num_replicas; | |
393 | uint32_t num_shards; | |
394 | ||
395 | es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {} | |
396 | ||
397 | void dump(Formatter *f) const { | |
398 | encode_json("number_of_replicas", num_replicas, f); | |
399 | encode_json("number_of_shards", num_shards, f); | |
400 | } | |
401 | }; | |
402 | ||
a8e16298 TL |
403 | struct es_index_config_base { |
404 | virtual ~es_index_config_base() {} | |
405 | virtual void dump(Formatter *f) const = 0; | |
406 | }; | |
407 | ||
408 | template <class T> | |
409 | struct es_index_config : public es_index_config_base { | |
31f18b77 | 410 | es_index_settings settings; |
a8e16298 | 411 | es_index_mappings<T> mappings; |
31f18b77 | 412 | |
eafe8130 TL |
413 | es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) { |
414 | } | |
31f18b77 FG |
415 | |
416 | void dump(Formatter *f) const { | |
417 | encode_json("settings", settings, f); | |
418 | encode_json("mappings", mappings, f); | |
419 | } | |
420 | }; | |
7c673cae | 421 | |
f64942e4 | 422 | static bool is_sys_attr(const std::string& attr_name){ |
11fdf7f2 TL |
423 | static constexpr std::initializer_list<const char*> rgw_sys_attrs = |
424 | {RGW_ATTR_PG_VER, | |
425 | RGW_ATTR_SOURCE_ZONE, | |
426 | RGW_ATTR_ID_TAG, | |
427 | RGW_ATTR_TEMPURL_KEY1, | |
428 | RGW_ATTR_TEMPURL_KEY2, | |
429 | RGW_ATTR_UNIX1, | |
430 | RGW_ATTR_UNIX_KEY1 | |
f64942e4 AA |
431 | }; |
432 | ||
433 | return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end(); | |
434 | } | |
435 | ||
a8e16298 TL |
436 | static size_t attr_len(const bufferlist& val) |
437 | { | |
438 | size_t len = val.length(); | |
439 | if (len && val[len - 1] == '\0') { | |
440 | --len; | |
441 | } | |
442 | ||
443 | return len; | |
444 | } | |
445 | ||
7c673cae FG |
446 | struct es_obj_metadata { |
447 | CephContext *cct; | |
31f18b77 | 448 | ElasticConfigRef es_conf; |
7c673cae FG |
449 | RGWBucketInfo bucket_info; |
450 | rgw_obj_key key; | |
451 | ceph::real_time mtime; | |
452 | uint64_t size; | |
453 | map<string, bufferlist> attrs; | |
31f18b77 | 454 | uint64_t versioned_epoch; |
7c673cae | 455 | |
31f18b77 | 456 | es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, |
7c673cae | 457 | const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, |
31f18b77 FG |
458 | map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), |
459 | mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {} | |
7c673cae FG |
460 | |
461 | void dump(Formatter *f) const { | |
462 | map<string, string> out_attrs; | |
463 | map<string, string> custom_meta; | |
464 | RGWAccessControlPolicy policy; | |
465 | set<string> permissions; | |
224ce89b | 466 | RGWObjTags obj_tags; |
7c673cae FG |
467 | |
468 | for (auto i : attrs) { | |
469 | const string& attr_name = i.first; | |
7c673cae FG |
470 | bufferlist& val = i.second; |
471 | ||
a8e16298 | 472 | if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) { |
7c673cae FG |
473 | continue; |
474 | } | |
475 | ||
a8e16298 | 476 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) { |
f64942e4 | 477 | custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1), |
a8e16298 TL |
478 | string(val.c_str(), attr_len(val))); |
479 | continue; | |
480 | } | |
481 | ||
482 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) { | |
7c673cae FG |
483 | continue; |
484 | } | |
485 | ||
a8e16298 TL |
486 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) { |
487 | // skip versioned object olh info | |
f64942e4 AA |
488 | continue; |
489 | } | |
7c673cae | 490 | |
f64942e4 | 491 | if (attr_name == RGW_ATTR_ACL) { |
7c673cae | 492 | try { |
11fdf7f2 TL |
493 | auto i = val.cbegin(); |
494 | decode(policy, i); | |
7c673cae FG |
495 | } catch (buffer::error& err) { |
496 | ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl; | |
f64942e4 | 497 | continue; |
7c673cae FG |
498 | } |
499 | ||
500 | const RGWAccessControlList& acl = policy.get_acl(); | |
501 | ||
502 | permissions.insert(policy.get_owner().get_id().to_str()); | |
503 | for (auto acliter : acl.get_grant_map()) { | |
504 | const ACLGrant& grant = acliter.second; | |
505 | if (grant.get_type().get_type() == ACL_TYPE_CANON_USER && | |
506 | ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) { | |
507 | rgw_user user; | |
508 | if (grant.get_id(user)) { | |
509 | permissions.insert(user.to_str()); | |
510 | } | |
511 | } | |
512 | } | |
f64942e4 AA |
513 | } else if (attr_name == RGW_ATTR_TAGS) { |
514 | try { | |
11fdf7f2 TL |
515 | auto tags_bl = val.cbegin(); |
516 | decode(obj_tags, tags_bl); | |
f64942e4 AA |
517 | } catch (buffer::error& err) { |
518 | ldout(cct,0) << "ERROR: failed to decode obj tags for " | |
519 | << bucket_info.bucket << "/" << key << dendl; | |
520 | continue; | |
521 | } | |
522 | } else if (attr_name == RGW_ATTR_COMPRESSION) { | |
28e407b8 | 523 | RGWCompressionInfo cs_info; |
f64942e4 | 524 | try { |
11fdf7f2 TL |
525 | auto vals_bl = val.cbegin(); |
526 | decode(cs_info, vals_bl); | |
f64942e4 AA |
527 | } catch (buffer::error& err) { |
528 | ldout(cct,0) << "ERROR: failed to decode compression attr for " | |
529 | << bucket_info.bucket << "/" << key << dendl; | |
530 | continue; | |
531 | } | |
532 | out_attrs.emplace("compression",std::move(cs_info.compression_type)); | |
7c673cae | 533 | } else { |
f64942e4 AA |
534 | if (!is_sys_attr(attr_name)) { |
535 | out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1), | |
a8e16298 | 536 | std::string(val.c_str(), attr_len(val))); |
7c673cae FG |
537 | } |
538 | } | |
539 | } | |
540 | ::encode_json("bucket", bucket_info.bucket.name, f); | |
541 | ::encode_json("name", key.name, f); | |
a8e16298 TL |
542 | string instance = key.instance; |
543 | if (instance.empty()) | |
544 | instance = "null"; | |
545 | ::encode_json("instance", instance, f); | |
31f18b77 | 546 | ::encode_json("versioned_epoch", versioned_epoch, f); |
7c673cae FG |
547 | ::encode_json("owner", policy.get_owner(), f); |
548 | ::encode_json("permissions", permissions, f); | |
549 | f->open_object_section("meta"); | |
550 | ::encode_json("size", size, f); | |
551 | ||
552 | string mtime_str; | |
553 | rgw_to_iso8601(mtime, &mtime_str); | |
554 | ::encode_json("mtime", mtime_str, f); | |
555 | for (auto i : out_attrs) { | |
556 | ::encode_json(i.first.c_str(), i.second, f); | |
557 | } | |
31f18b77 FG |
558 | map<string, string> custom_str; |
559 | map<string, string> custom_int; | |
560 | map<string, string> custom_date; | |
561 | ||
562 | for (auto i : custom_meta) { | |
563 | auto config = bucket_info.mdsearch_config.find(i.first); | |
564 | if (config == bucket_info.mdsearch_config.end()) { | |
565 | if (!es_conf->explicit_custom_meta) { | |
566 | /* default custom meta is of type string */ | |
567 | custom_str[i.first] = i.second; | |
568 | } else { | |
569 | ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; | |
570 | } | |
571 | continue; | |
572 | } | |
573 | switch (config->second) { | |
574 | case ESEntityTypeMap::ES_ENTITY_DATE: | |
575 | custom_date[i.first] = i.second; | |
576 | break; | |
577 | case ESEntityTypeMap::ES_ENTITY_INT: | |
578 | custom_int[i.first] = i.second; | |
579 | break; | |
580 | default: | |
581 | custom_str[i.first] = i.second; | |
582 | } | |
583 | } | |
584 | ||
585 | if (!custom_str.empty()) { | |
586 | f->open_array_section("custom-string"); | |
587 | for (auto i : custom_str) { | |
588 | f->open_object_section("entity"); | |
589 | ::encode_json("name", i.first.c_str(), f); | |
590 | ::encode_json("value", i.second, f); | |
591 | f->close_section(); | |
592 | } | |
593 | f->close_section(); | |
594 | } | |
595 | if (!custom_int.empty()) { | |
596 | f->open_array_section("custom-int"); | |
597 | for (auto i : custom_int) { | |
598 | f->open_object_section("entity"); | |
599 | ::encode_json("name", i.first.c_str(), f); | |
600 | ::encode_json("value", i.second, f); | |
601 | f->close_section(); | |
602 | } | |
603 | f->close_section(); | |
604 | } | |
605 | if (!custom_date.empty()) { | |
606 | f->open_array_section("custom-date"); | |
607 | for (auto i : custom_date) { | |
608 | /* | |
609 | * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc, | |
610 | * which will end up with failed sync | |
611 | */ | |
612 | real_time t; | |
613 | int r = parse_time(i.second.c_str(), &t); | |
614 | if (r < 0) { | |
615 | ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; | |
616 | continue; | |
617 | } | |
618 | ||
619 | string time_str; | |
620 | rgw_to_iso8601(t, &time_str); | |
621 | ||
622 | f->open_object_section("entity"); | |
623 | ::encode_json("name", i.first.c_str(), f); | |
624 | ::encode_json("value", time_str.c_str(), f); | |
625 | f->close_section(); | |
7c673cae FG |
626 | } |
627 | f->close_section(); | |
628 | } | |
224ce89b WB |
629 | f->close_section(); // meta |
630 | const auto& m = obj_tags.get_tags(); | |
631 | if (m.size() > 0){ | |
632 | f->open_array_section("tagging"); | |
633 | for (const auto &it : m) { | |
634 | f->open_object_section("tag"); | |
635 | ::encode_json("key", it.first, f); | |
636 | ::encode_json("value",it.second, f); | |
637 | f->close_section(); | |
638 | } | |
639 | f->close_section(); // tagging | |
640 | } | |
7c673cae | 641 | } |
31f18b77 FG |
642 | }; |
643 | ||
644 | class RGWElasticInitConfigCBCR : public RGWCoroutine { | |
645 | RGWDataSyncEnv *sync_env; | |
646 | ElasticConfigRef conf; | |
a8e16298 TL |
647 | ESInfo es_info; |
648 | ||
649 | struct _err_response { | |
650 | struct err_reason { | |
651 | vector<err_reason> root_cause; | |
652 | string type; | |
653 | string reason; | |
654 | string index; | |
655 | ||
656 | void decode_json(JSONObj *obj) { | |
657 | JSONDecoder::decode_json("root_cause", root_cause, obj); | |
658 | JSONDecoder::decode_json("type", type, obj); | |
659 | JSONDecoder::decode_json("reason", reason, obj); | |
660 | JSONDecoder::decode_json("index", index, obj); | |
661 | } | |
662 | } error; | |
663 | ||
664 | void decode_json(JSONObj *obj) { | |
665 | JSONDecoder::decode_json("error", error, obj); | |
666 | } | |
667 | } err_response; | |
668 | ||
31f18b77 FG |
669 | public: |
670 | RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, | |
671 | ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), | |
672 | sync_env(_sync_env), | |
673 | conf(_conf) {} | |
674 | int operate() override { | |
675 | reenter(this) { | |
676 | ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; | |
a8e16298 TL |
677 | yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct, |
678 | conf->conn.get(), | |
679 | sync_env->http_manager, | |
680 | "/", nullptr /*params*/, | |
681 | &(conf->default_headers), | |
682 | &es_info)); | |
683 | if (retcode < 0) { | |
684 | return set_cr_error(retcode); | |
685 | } | |
686 | ||
31f18b77 FG |
687 | yield { |
688 | string path = conf->get_index_path(); | |
a8e16298 | 689 | ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl; |
31f18b77 FG |
690 | |
691 | es_index_settings settings(conf->num_replicas, conf->num_shards); | |
31f18b77 | 692 | |
a8e16298 | 693 | std::unique_ptr<es_index_config_base> index_conf; |
31f18b77 | 694 | |
a8e16298 TL |
695 | if (es_info.version >= ES_V5) { |
696 | ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; | |
eafe8130 | 697 | index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version)); |
a8e16298 TL |
698 | } else { |
699 | ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; | |
eafe8130 | 700 | index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version)); |
a8e16298 TL |
701 | } |
702 | call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct, | |
703 | conf->conn.get(), | |
704 | sync_env->http_manager, | |
705 | path, nullptr /*params*/, | |
706 | &(conf->default_headers), | |
707 | *index_conf, nullptr, &err_response)); | |
31f18b77 FG |
708 | } |
709 | if (retcode < 0) { | |
a8e16298 TL |
710 | ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl; |
711 | ||
eafe8130 TL |
712 | if (err_response.error.type != "index_already_exists_exception" && |
713 | err_response.error.type != "resource_already_exists_exception") { | |
a8e16298 TL |
714 | return set_cr_error(retcode); |
715 | } | |
716 | ||
717 | ldout(sync_env->cct, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl; | |
31f18b77 FG |
718 | } |
719 | return set_cr_done(); | |
720 | } | |
721 | return 0; | |
722 | } | |
7c673cae FG |
723 | |
724 | }; | |
725 | ||
726 | class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { | |
31f18b77 FG |
727 | ElasticConfigRef conf; |
728 | uint64_t versioned_epoch; | |
7c673cae FG |
729 | public: |
730 | RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
731 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
31f18b77 FG |
732 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), |
733 | versioned_epoch(_versioned_epoch) {} | |
7c673cae FG |
734 | int operate() override { |
735 | reenter(this) { | |
31f18b77 | 736 | ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone |
a8e16298 TL |
737 | << " b=" << bucket_info.bucket << " k=" << key |
738 | << " size=" << size << " mtime=" << mtime << dendl; | |
739 | ||
7c673cae | 740 | yield { |
31f18b77 FG |
741 | string path = conf->get_obj_path(bucket_info, key); |
742 | es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); | |
7c673cae | 743 | |
31f18b77 | 744 | call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(), |
7c673cae FG |
745 | sync_env->http_manager, |
746 | path, nullptr /* params */, | |
a8e16298 | 747 | &(conf->default_headers), |
7c673cae FG |
748 | doc, nullptr /* result */)); |
749 | ||
750 | } | |
751 | if (retcode < 0) { | |
752 | return set_cr_error(retcode); | |
753 | } | |
754 | return set_cr_done(); | |
755 | } | |
756 | return 0; | |
757 | } | |
7c673cae FG |
758 | }; |
759 | ||
760 | class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
31f18b77 FG |
761 | ElasticConfigRef conf; |
762 | uint64_t versioned_epoch; | |
7c673cae FG |
763 | public: |
764 | RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, | |
765 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
31f18b77 FG |
766 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), |
767 | conf(_conf), versioned_epoch(_versioned_epoch) { | |
7c673cae FG |
768 | } |
769 | ||
770 | ~RGWElasticHandleRemoteObjCR() override {} | |
771 | ||
772 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
31f18b77 | 773 | return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); |
7c673cae FG |
774 | } |
775 | }; | |
776 | ||
777 | class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { | |
778 | RGWDataSyncEnv *sync_env; | |
779 | RGWBucketInfo bucket_info; | |
780 | rgw_obj_key key; | |
781 | ceph::real_time mtime; | |
31f18b77 | 782 | ElasticConfigRef conf; |
7c673cae FG |
783 | public: |
784 | RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
785 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, | |
31f18b77 | 786 | ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), |
7c673cae FG |
787 | bucket_info(_bucket_info), key(_key), |
788 | mtime(_mtime), conf(_conf) {} | |
789 | int operate() override { | |
790 | reenter(this) { | |
31f18b77 FG |
791 | ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone |
792 | << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; | |
7c673cae | 793 | yield { |
31f18b77 | 794 | string path = conf->get_obj_path(bucket_info, key); |
7c673cae | 795 | |
31f18b77 | 796 | call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), |
7c673cae FG |
797 | sync_env->http_manager, |
798 | path, nullptr /* params */)); | |
799 | } | |
800 | if (retcode < 0) { | |
801 | return set_cr_error(retcode); | |
802 | } | |
803 | return set_cr_done(); | |
804 | } | |
805 | return 0; | |
806 | } | |
807 | ||
808 | }; | |
809 | ||
810 | class RGWElasticDataSyncModule : public RGWDataSyncModule { | |
31f18b77 | 811 | ElasticConfigRef conf; |
7c673cae | 812 | public: |
11fdf7f2 | 813 | RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) { |
31f18b77 | 814 | conf->init(cct, config); |
7c673cae | 815 | } |
31f18b77 FG |
816 | ~RGWElasticDataSyncModule() override {} |
817 | ||
818 | void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { | |
11fdf7f2 | 819 | conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id); |
eafe8130 TL |
820 | // try to get elastic search version |
821 | RGWCoroutinesManager crs(sync_env->store->ctx(), sync_env->store->get_cr_registry()); | |
822 | RGWHTTPManager http_manager(sync_env->store->ctx(), crs.get_completion_mgr()); | |
823 | int ret = http_manager.start(); | |
824 | if (ret < 0) { | |
825 | return; | |
826 | } | |
827 | ret = crs.run(new RGWReadRESTResourceCR<ESInfo>(sync_env->cct, | |
828 | conf->conn.get(), | |
829 | &http_manager, | |
830 | "/", nullptr, | |
831 | &(conf->default_headers), | |
832 | &(conf->es_info))); | |
833 | http_manager.stop(); | |
834 | if (ret < 0) { | |
835 | ldout(sync_env->cct, 1) << conf->id << ": fetch elastic info failed: " << ret << dendl; | |
836 | } else { | |
837 | ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl; | |
838 | } | |
7c673cae FG |
839 | } |
840 | ||
31f18b77 FG |
841 | RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { |
842 | ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; | |
843 | return new RGWElasticInitConfigCBCR(sync_env, conf); | |
844 | } | |
11fdf7f2 | 845 | RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override { |
91327a77 | 846 | ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; |
31f18b77 FG |
847 | if (!conf->should_handle_operation(bucket_info)) { |
848 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
849 | return nullptr; | |
850 | } | |
91327a77 | 851 | return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0)); |
7c673cae | 852 | } |
31f18b77 | 853 | RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
7c673cae | 854 | /* versioned and versioned epoch params are useless in the elasticsearch backend case */ |
31f18b77 FG |
855 | ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
856 | if (!conf->should_handle_operation(bucket_info)) { | |
857 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
858 | return nullptr; | |
859 | } | |
7c673cae FG |
860 | return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); |
861 | } | |
862 | RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, | |
31f18b77 FG |
863 | rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
864 | ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime | |
7c673cae | 865 | << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
31f18b77 | 866 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; |
7c673cae FG |
867 | return NULL; |
868 | } | |
31f18b77 FG |
869 | RGWRESTConn *get_rest_conn() { |
870 | return conf->conn.get(); | |
871 | } | |
7c673cae | 872 | |
31f18b77 FG |
873 | string get_index_path() { |
874 | return conf->get_index_path(); | |
7c673cae | 875 | } |
a8e16298 TL |
876 | |
877 | map<string, string>& get_request_headers() { | |
878 | return conf->get_request_headers(); | |
879 | } | |
7c673cae FG |
880 | }; |
881 | ||
11fdf7f2 | 882 | RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config) |
31f18b77 FG |
883 | { |
884 | data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config)); | |
885 | } | |
886 | ||
887 | RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() | |
888 | { | |
889 | return data_handler.get(); | |
890 | } | |
891 | ||
892 | RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() | |
893 | { | |
894 | return data_handler->get_rest_conn(); | |
895 | } | |
896 | ||
897 | string RGWElasticSyncModuleInstance::get_index_path() { | |
898 | return data_handler->get_index_path(); | |
899 | } | |
900 | ||
a8e16298 TL |
901 | map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() { |
902 | return data_handler->get_request_headers(); | |
903 | } | |
904 | ||
31f18b77 FG |
905 | RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { |
906 | if (dialect != RGW_REST_S3) { | |
907 | return orig; | |
908 | } | |
909 | delete orig; | |
910 | return new RGWRESTMgr_MDSearch_S3(); | |
911 | } | |
912 | ||
11fdf7f2 TL |
913 | int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { |
914 | string endpoint = config["endpoint"]; | |
31f18b77 | 915 | instance->reset(new RGWElasticSyncModuleInstance(cct, config)); |
7c673cae FG |
916 | return 0; |
917 | } | |
918 |