]>
Commit | Line | Data |
---|---|---|
a8e16298 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_b64.h" | |
7c673cae FG |
5 | #include "rgw_common.h" |
6 | #include "rgw_coroutine.h" | |
7 | #include "rgw_sync_module.h" | |
8 | #include "rgw_data_sync.h" | |
7c673cae | 9 | #include "rgw_sync_module_es.h" |
31f18b77 | 10 | #include "rgw_sync_module_es_rest.h" |
7c673cae FG |
11 | #include "rgw_rest_conn.h" |
12 | #include "rgw_cr_rest.h" | |
31f18b77 FG |
13 | #include "rgw_op.h" |
14 | #include "rgw_es_query.h" | |
11fdf7f2 TL |
15 | #include "rgw_zone.h" |
16 | ||
17 | #include "services/svc_zone.h" | |
31f18b77 FG |
18 | |
19 | #include "include/str_list.h" | |
20 | ||
21 | #include <boost/asio/yield.hpp> | |
7c673cae FG |
22 | |
23 | #define dout_subsys ceph_subsys_rgw | |
24 | ||
31f18b77 FG |
25 | |
26 | /* | |
27 | * whitelist utility. Config string is a list of entries, where an entry is either an item, | |
28 | * a prefix, or a suffix. An item would be the name of the entity that we'd look up, | |
29 | * a prefix would be a string ending with an asterisk, a suffix would be a string starting | |
30 | * with an asterisk. For example: | |
31 | * | |
32 | * bucket1, bucket2, foo*, *bar | |
33 | */ | |
34 | class ItemList { | |
35 | bool approve_all{false}; | |
36 | ||
37 | set<string> entries; | |
38 | set<string> prefixes; | |
39 | set<string> suffixes; | |
40 | ||
41 | void parse(const string& str) { | |
42 | list<string> l; | |
43 | ||
44 | get_str_list(str, ",", l); | |
45 | ||
46 | for (auto& entry : l) { | |
47 | entry = rgw_trim_whitespace(entry); | |
48 | if (entry.empty()) { | |
49 | continue; | |
50 | } | |
51 | ||
52 | if (entry == "*") { | |
53 | approve_all = true; | |
54 | return; | |
55 | } | |
56 | ||
57 | if (entry[0] == '*') { | |
58 | suffixes.insert(entry.substr(1)); | |
59 | continue; | |
60 | } | |
61 | ||
62 | if (entry.back() == '*') { | |
63 | prefixes.insert(entry.substr(0, entry.size() - 1)); | |
64 | continue; | |
65 | } | |
66 | ||
67 | entries.insert(entry); | |
68 | } | |
69 | } | |
70 | ||
71 | public: | |
72 | ItemList() {} | |
73 | void init(const string& str, bool def_val) { | |
74 | if (str.empty()) { | |
75 | approve_all = def_val; | |
76 | } else { | |
77 | parse(str); | |
78 | } | |
79 | } | |
80 | ||
81 | bool exists(const string& entry) { | |
82 | if (approve_all) { | |
83 | return true; | |
84 | } | |
85 | ||
86 | if (entries.find(entry) != entries.end()) { | |
87 | return true; | |
88 | } | |
89 | ||
90 | auto i = prefixes.upper_bound(entry); | |
91 | if (i != prefixes.begin()) { | |
92 | --i; | |
93 | if (boost::algorithm::starts_with(entry, *i)) { | |
94 | return true; | |
95 | } | |
96 | } | |
97 | ||
98 | for (i = suffixes.begin(); i != suffixes.end(); ++i) { | |
99 | if (boost::algorithm::ends_with(entry, *i)) { | |
100 | return true; | |
101 | } | |
102 | } | |
103 | ||
104 | return false; | |
105 | } | |
106 | }; | |
107 | ||
108 | #define ES_NUM_SHARDS_MIN 5 | |
109 | ||
110 | #define ES_NUM_SHARDS_DEFAULT 16 | |
111 | #define ES_NUM_REPLICAS_DEFAULT 1 | |
112 | ||
a8e16298 TL |
113 | using ESVersion = std::pair<int,int>; |
114 | static constexpr ESVersion ES_V5{5,0}; | |
115 | ||
116 | struct ESInfo { | |
117 | std::string name; | |
118 | std::string cluster_name; | |
119 | std::string cluster_uuid; | |
120 | ESVersion version; | |
121 | ||
122 | void decode_json(JSONObj *obj); | |
123 | ||
124 | std::string get_version_str(){ | |
125 | return std::to_string(version.first) + "." + std::to_string(version.second); | |
126 | } | |
127 | }; | |
128 | ||
129 | // simple wrapper structure to wrap the es version nested type | |
130 | struct es_version_decoder { | |
131 | ESVersion version; | |
132 | ||
133 | int parse_version(const std::string& s) { | |
134 | int major, minor; | |
135 | int ret = sscanf(s.c_str(), "%d.%d", &major, &minor); | |
136 | if (ret < 0) { | |
137 | return ret; | |
138 | } | |
139 | version = std::make_pair(major,minor); | |
140 | return 0; | |
141 | } | |
142 | ||
143 | void decode_json(JSONObj *obj) { | |
144 | std::string s; | |
145 | JSONDecoder::decode_json("number",s,obj); | |
146 | if (parse_version(s) < 0) | |
147 | throw JSONDecoder::err("Failed to parse ElasticVersion"); | |
148 | } | |
149 | }; | |
150 | ||
151 | ||
152 | void ESInfo::decode_json(JSONObj *obj) | |
153 | { | |
154 | JSONDecoder::decode_json("name", name, obj); | |
155 | JSONDecoder::decode_json("cluster_name", cluster_name, obj); | |
156 | JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj); | |
157 | es_version_decoder esv; | |
158 | JSONDecoder::decode_json("version", esv, obj); | |
159 | version = std::move(esv.version); | |
160 | } | |
161 | ||
7c673cae | 162 | struct ElasticConfig { |
31f18b77 | 163 | uint64_t sync_instance{0}; |
7c673cae | 164 | string id; |
31f18b77 FG |
165 | string index_path; |
166 | std::unique_ptr<RGWRESTConn> conn; | |
167 | bool explicit_custom_meta{true}; | |
168 | string override_index_path; | |
169 | ItemList index_buckets; | |
170 | ItemList allow_owners; | |
171 | uint32_t num_shards{0}; | |
172 | uint32_t num_replicas{0}; | |
a8e16298 | 173 | std::map <string,string> default_headers = {{ "Content-Type", "application/json" }}; |
31f18b77 | 174 | |
11fdf7f2 TL |
175 | void init(CephContext *cct, const JSONFormattable& config) { |
176 | string elastic_endpoint = config["endpoint"]; | |
31f18b77 FG |
177 | id = string("elastic:") + elastic_endpoint; |
178 | conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint })); | |
11fdf7f2 TL |
179 | explicit_custom_meta = config["explicit_custom_meta"](true); |
180 | index_buckets.init(config["index_buckets_list"], true); /* approve all buckets by default */ | |
181 | allow_owners.init(config["approved_owners_list"], true); /* approve all bucket owners by default */ | |
182 | override_index_path = config["override_index_path"]; | |
183 | num_shards = config["num_shards"](ES_NUM_SHARDS_DEFAULT); | |
31f18b77 FG |
184 | if (num_shards < ES_NUM_SHARDS_MIN) { |
185 | num_shards = ES_NUM_SHARDS_MIN; | |
186 | } | |
11fdf7f2 TL |
187 | num_replicas = config["num_replicas"](ES_NUM_REPLICAS_DEFAULT); |
188 | if (string user = config["username"], pw = config["password"]; | |
189 | !user.empty() && !pw.empty()) { | |
a8e16298 TL |
190 | auto auth_string = user + ":" + pw; |
191 | default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string)); | |
192 | } | |
11fdf7f2 | 193 | |
31f18b77 FG |
194 | } |
195 | ||
11fdf7f2 | 196 | void init_instance(const RGWRealm& realm, uint64_t instance_id) { |
31f18b77 FG |
197 | sync_instance = instance_id; |
198 | ||
199 | if (!override_index_path.empty()) { | |
200 | index_path = override_index_path; | |
201 | return; | |
202 | } | |
203 | ||
204 | char buf[32]; | |
205 | snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF)); | |
206 | ||
207 | index_path = "/rgw-" + realm.get_name() + buf; | |
208 | } | |
209 | ||
210 | string get_index_path() { | |
211 | return index_path; | |
212 | } | |
213 | ||
a8e16298 TL |
214 | map<string, string>& get_request_headers() { |
215 | return default_headers; | |
216 | } | |
217 | ||
31f18b77 | 218 | string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { |
91327a77 | 219 | return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); |
31f18b77 FG |
220 | } |
221 | ||
222 | bool should_handle_operation(RGWBucketInfo& bucket_info) { | |
223 | return index_buckets.exists(bucket_info.bucket.name) && | |
224 | allow_owners.exists(bucket_info.owner.to_str()); | |
225 | } | |
7c673cae FG |
226 | }; |
227 | ||
31f18b77 FG |
228 | using ElasticConfigRef = std::shared_ptr<ElasticConfig>; |
229 | ||
a8e16298 TL |
230 | static const char *es_type_to_str(const ESType& t) { |
231 | switch (t) { | |
232 | case ESType::String: return "string"; | |
233 | case ESType::Text: return "text"; | |
234 | case ESType::Keyword: return "keyword"; | |
235 | case ESType::Long: return "long"; | |
236 | case ESType::Integer: return "integer"; | |
237 | case ESType::Short: return "short"; | |
238 | case ESType::Byte: return "byte"; | |
239 | case ESType::Double: return "double"; | |
240 | case ESType::Float: return "float"; | |
241 | case ESType::Half_Float: return "half_float"; | |
242 | case ESType::Scaled_Float: return "scaled_float"; | |
243 | case ESType::Date: return "date"; | |
244 | case ESType::Boolean: return "boolean"; | |
245 | case ESType::Integer_Range: return "integer_range"; | |
246 | case ESType::Float_Range: return "float_range"; | |
247 | case ESType::Double_Range: return "date_range"; | |
248 | case ESType::Date_Range: return "date_range"; | |
249 | case ESType::Geo_Point: return "geo_point"; | |
250 | case ESType::Ip: return "ip"; | |
251 | default: | |
252 | return "<unknown>"; | |
253 | } | |
254 | } | |
255 | ||
256 | struct es_type_v2 { | |
257 | ESType estype; | |
258 | const char *format{nullptr}; | |
11fdf7f2 | 259 | std::optional<bool> analyzed; |
a8e16298 TL |
260 | |
261 | es_type_v2(ESType et) : estype(et) {} | |
262 | ||
263 | void dump(Formatter *f) const { | |
264 | const char *type_str = es_type_to_str(estype); | |
265 | encode_json("type", type_str, f); | |
266 | if (format) { | |
267 | encode_json("format", format, f); | |
268 | } | |
269 | ||
270 | auto is_analyzed = analyzed; | |
271 | ||
272 | if (estype == ESType::String && | |
273 | !is_analyzed) { | |
274 | is_analyzed = false; | |
275 | } | |
276 | ||
277 | if (is_analyzed) { | |
278 | encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f); | |
279 | } | |
280 | } | |
281 | }; | |
31f18b77 | 282 | |
a8e16298 TL |
283 | struct es_type_v5 { |
284 | ESType estype; | |
285 | const char *format{nullptr}; | |
11fdf7f2 TL |
286 | std::optional<bool> analyzed; |
287 | std::optional<bool> index; | |
a8e16298 TL |
288 | |
289 | es_type_v5(ESType et) : estype(et) {} | |
31f18b77 FG |
290 | |
291 | void dump(Formatter *f) const { | |
a8e16298 TL |
292 | ESType new_estype; |
293 | if (estype != ESType::String) { | |
294 | new_estype = estype; | |
295 | } else { | |
296 | bool is_analyzed = analyzed.value_or(false); | |
297 | new_estype = (is_analyzed ? ESType::Text : ESType::Keyword); | |
298 | /* index = true; ... Not setting index=true, because that's the default, | |
299 | * and dumping a boolean value *might* be a problem when backporting this | |
300 | * because value might get quoted | |
301 | */ | |
302 | } | |
303 | ||
304 | const char *type_str = es_type_to_str(new_estype); | |
305 | encode_json("type", type_str, f); | |
31f18b77 FG |
306 | if (format) { |
307 | encode_json("format", format, f); | |
308 | } | |
a8e16298 TL |
309 | if (index) { |
310 | encode_json("index", index.value(), f); | |
31f18b77 FG |
311 | } |
312 | } | |
313 | }; | |
314 | ||
a8e16298 TL |
315 | template <class T> |
316 | struct es_type : public T { | |
317 | es_type(T t) : T(t) {} | |
318 | es_type& set_format(const char *f) { | |
319 | T::format = f; | |
320 | return *this; | |
321 | } | |
322 | ||
323 | es_type& set_analyzed(bool a) { | |
324 | T::analyzed = a; | |
325 | return *this; | |
326 | } | |
327 | }; | |
328 | ||
329 | template <class T> | |
31f18b77 | 330 | struct es_index_mappings { |
a8e16298 TL |
331 | ESType string_type {ESType::String}; |
332 | ||
333 | es_type<T> est(ESType t) const { | |
334 | return es_type<T>(t); | |
335 | } | |
336 | ||
337 | void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const { | |
31f18b77 FG |
338 | f->open_object_section(section); |
339 | ::encode_json("type", "nested", f); | |
340 | f->open_object_section("properties"); | |
a8e16298 TL |
341 | encode_json("name", est(string_type), f); |
342 | encode_json("value", est(type).set_format(format), f); | |
31f18b77 FG |
343 | f->close_section(); // entry |
344 | f->close_section(); // custom-string | |
345 | } | |
a8e16298 | 346 | |
31f18b77 FG |
347 | void dump(Formatter *f) const { |
348 | f->open_object_section("object"); | |
349 | f->open_object_section("properties"); | |
a8e16298 TL |
350 | encode_json("bucket", est(string_type), f); |
351 | encode_json("name", est(string_type), f); | |
352 | encode_json("instance", est(string_type), f); | |
353 | encode_json("versioned_epoch", est(ESType::Long), f); | |
31f18b77 FG |
354 | f->open_object_section("meta"); |
355 | f->open_object_section("properties"); | |
a8e16298 TL |
356 | encode_json("cache_control", est(string_type), f); |
357 | encode_json("content_disposition", est(string_type), f); | |
358 | encode_json("content_encoding", est(string_type), f); | |
359 | encode_json("content_language", est(string_type), f); | |
360 | encode_json("content_type", est(string_type), f); | |
361 | encode_json("storage_class", est(string_type), f); | |
362 | encode_json("etag", est(string_type), f); | |
363 | encode_json("expires", est(string_type), f); | |
364 | encode_json("mtime", est(ESType::Date) | |
365 | .set_format("strict_date_optional_time||epoch_millis"), f); | |
366 | encode_json("size", est(ESType::Long), f); | |
367 | dump_custom("custom-string", string_type, nullptr, f); | |
368 | dump_custom("custom-int", ESType::Long, nullptr, f); | |
369 | dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f); | |
31f18b77 FG |
370 | f->close_section(); // properties |
371 | f->close_section(); // meta | |
372 | f->close_section(); // properties | |
373 | f->close_section(); // object | |
374 | } | |
375 | }; | |
376 | ||
377 | struct es_index_settings { | |
378 | uint32_t num_replicas; | |
379 | uint32_t num_shards; | |
380 | ||
381 | es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {} | |
382 | ||
383 | void dump(Formatter *f) const { | |
384 | encode_json("number_of_replicas", num_replicas, f); | |
385 | encode_json("number_of_shards", num_shards, f); | |
386 | } | |
387 | }; | |
388 | ||
a8e16298 TL |
389 | struct es_index_config_base { |
390 | virtual ~es_index_config_base() {} | |
391 | virtual void dump(Formatter *f) const = 0; | |
392 | }; | |
393 | ||
394 | template <class T> | |
395 | struct es_index_config : public es_index_config_base { | |
31f18b77 | 396 | es_index_settings settings; |
a8e16298 | 397 | es_index_mappings<T> mappings; |
31f18b77 | 398 | |
a8e16298 | 399 | es_index_config(es_index_settings& _s) : settings(_s) {} |
31f18b77 FG |
400 | |
401 | void dump(Formatter *f) const { | |
402 | encode_json("settings", settings, f); | |
403 | encode_json("mappings", mappings, f); | |
404 | } | |
405 | }; | |
7c673cae | 406 | |
f64942e4 | 407 | static bool is_sys_attr(const std::string& attr_name){ |
11fdf7f2 TL |
408 | static constexpr std::initializer_list<const char*> rgw_sys_attrs = |
409 | {RGW_ATTR_PG_VER, | |
410 | RGW_ATTR_SOURCE_ZONE, | |
411 | RGW_ATTR_ID_TAG, | |
412 | RGW_ATTR_TEMPURL_KEY1, | |
413 | RGW_ATTR_TEMPURL_KEY2, | |
414 | RGW_ATTR_UNIX1, | |
415 | RGW_ATTR_UNIX_KEY1 | |
f64942e4 AA |
416 | }; |
417 | ||
418 | return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end(); | |
419 | } | |
420 | ||
a8e16298 TL |
421 | static size_t attr_len(const bufferlist& val) |
422 | { | |
423 | size_t len = val.length(); | |
424 | if (len && val[len - 1] == '\0') { | |
425 | --len; | |
426 | } | |
427 | ||
428 | return len; | |
429 | } | |
430 | ||
7c673cae FG |
431 | struct es_obj_metadata { |
432 | CephContext *cct; | |
31f18b77 | 433 | ElasticConfigRef es_conf; |
7c673cae FG |
434 | RGWBucketInfo bucket_info; |
435 | rgw_obj_key key; | |
436 | ceph::real_time mtime; | |
437 | uint64_t size; | |
438 | map<string, bufferlist> attrs; | |
31f18b77 | 439 | uint64_t versioned_epoch; |
7c673cae | 440 | |
31f18b77 | 441 | es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, |
7c673cae | 442 | const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, |
31f18b77 FG |
443 | map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), |
444 | mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {} | |
7c673cae FG |
445 | |
446 | void dump(Formatter *f) const { | |
447 | map<string, string> out_attrs; | |
448 | map<string, string> custom_meta; | |
449 | RGWAccessControlPolicy policy; | |
450 | set<string> permissions; | |
224ce89b | 451 | RGWObjTags obj_tags; |
7c673cae FG |
452 | |
453 | for (auto i : attrs) { | |
454 | const string& attr_name = i.first; | |
7c673cae FG |
455 | bufferlist& val = i.second; |
456 | ||
a8e16298 | 457 | if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) { |
7c673cae FG |
458 | continue; |
459 | } | |
460 | ||
a8e16298 | 461 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) { |
f64942e4 | 462 | custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1), |
a8e16298 TL |
463 | string(val.c_str(), attr_len(val))); |
464 | continue; | |
465 | } | |
466 | ||
467 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) { | |
7c673cae FG |
468 | continue; |
469 | } | |
470 | ||
a8e16298 TL |
471 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) { |
472 | // skip versioned object olh info | |
f64942e4 AA |
473 | continue; |
474 | } | |
7c673cae | 475 | |
f64942e4 | 476 | if (attr_name == RGW_ATTR_ACL) { |
7c673cae | 477 | try { |
11fdf7f2 TL |
478 | auto i = val.cbegin(); |
479 | decode(policy, i); | |
7c673cae FG |
480 | } catch (buffer::error& err) { |
481 | ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl; | |
f64942e4 | 482 | continue; |
7c673cae FG |
483 | } |
484 | ||
485 | const RGWAccessControlList& acl = policy.get_acl(); | |
486 | ||
487 | permissions.insert(policy.get_owner().get_id().to_str()); | |
488 | for (auto acliter : acl.get_grant_map()) { | |
489 | const ACLGrant& grant = acliter.second; | |
490 | if (grant.get_type().get_type() == ACL_TYPE_CANON_USER && | |
491 | ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) { | |
492 | rgw_user user; | |
493 | if (grant.get_id(user)) { | |
494 | permissions.insert(user.to_str()); | |
495 | } | |
496 | } | |
497 | } | |
f64942e4 AA |
498 | } else if (attr_name == RGW_ATTR_TAGS) { |
499 | try { | |
11fdf7f2 TL |
500 | auto tags_bl = val.cbegin(); |
501 | decode(obj_tags, tags_bl); | |
f64942e4 AA |
502 | } catch (buffer::error& err) { |
503 | ldout(cct,0) << "ERROR: failed to decode obj tags for " | |
504 | << bucket_info.bucket << "/" << key << dendl; | |
505 | continue; | |
506 | } | |
507 | } else if (attr_name == RGW_ATTR_COMPRESSION) { | |
28e407b8 | 508 | RGWCompressionInfo cs_info; |
f64942e4 | 509 | try { |
11fdf7f2 TL |
510 | auto vals_bl = val.cbegin(); |
511 | decode(cs_info, vals_bl); | |
f64942e4 AA |
512 | } catch (buffer::error& err) { |
513 | ldout(cct,0) << "ERROR: failed to decode compression attr for " | |
514 | << bucket_info.bucket << "/" << key << dendl; | |
515 | continue; | |
516 | } | |
517 | out_attrs.emplace("compression",std::move(cs_info.compression_type)); | |
7c673cae | 518 | } else { |
f64942e4 AA |
519 | if (!is_sys_attr(attr_name)) { |
520 | out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1), | |
a8e16298 | 521 | std::string(val.c_str(), attr_len(val))); |
7c673cae FG |
522 | } |
523 | } | |
524 | } | |
525 | ::encode_json("bucket", bucket_info.bucket.name, f); | |
526 | ::encode_json("name", key.name, f); | |
a8e16298 TL |
527 | string instance = key.instance; |
528 | if (instance.empty()) | |
529 | instance = "null"; | |
530 | ::encode_json("instance", instance, f); | |
31f18b77 | 531 | ::encode_json("versioned_epoch", versioned_epoch, f); |
7c673cae FG |
532 | ::encode_json("owner", policy.get_owner(), f); |
533 | ::encode_json("permissions", permissions, f); | |
534 | f->open_object_section("meta"); | |
535 | ::encode_json("size", size, f); | |
536 | ||
537 | string mtime_str; | |
538 | rgw_to_iso8601(mtime, &mtime_str); | |
539 | ::encode_json("mtime", mtime_str, f); | |
540 | for (auto i : out_attrs) { | |
541 | ::encode_json(i.first.c_str(), i.second, f); | |
542 | } | |
31f18b77 FG |
543 | map<string, string> custom_str; |
544 | map<string, string> custom_int; | |
545 | map<string, string> custom_date; | |
546 | ||
547 | for (auto i : custom_meta) { | |
548 | auto config = bucket_info.mdsearch_config.find(i.first); | |
549 | if (config == bucket_info.mdsearch_config.end()) { | |
550 | if (!es_conf->explicit_custom_meta) { | |
551 | /* default custom meta is of type string */ | |
552 | custom_str[i.first] = i.second; | |
553 | } else { | |
554 | ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; | |
555 | } | |
556 | continue; | |
557 | } | |
558 | switch (config->second) { | |
559 | case ESEntityTypeMap::ES_ENTITY_DATE: | |
560 | custom_date[i.first] = i.second; | |
561 | break; | |
562 | case ESEntityTypeMap::ES_ENTITY_INT: | |
563 | custom_int[i.first] = i.second; | |
564 | break; | |
565 | default: | |
566 | custom_str[i.first] = i.second; | |
567 | } | |
568 | } | |
569 | ||
570 | if (!custom_str.empty()) { | |
571 | f->open_array_section("custom-string"); | |
572 | for (auto i : custom_str) { | |
573 | f->open_object_section("entity"); | |
574 | ::encode_json("name", i.first.c_str(), f); | |
575 | ::encode_json("value", i.second, f); | |
576 | f->close_section(); | |
577 | } | |
578 | f->close_section(); | |
579 | } | |
580 | if (!custom_int.empty()) { | |
581 | f->open_array_section("custom-int"); | |
582 | for (auto i : custom_int) { | |
583 | f->open_object_section("entity"); | |
584 | ::encode_json("name", i.first.c_str(), f); | |
585 | ::encode_json("value", i.second, f); | |
586 | f->close_section(); | |
587 | } | |
588 | f->close_section(); | |
589 | } | |
590 | if (!custom_date.empty()) { | |
591 | f->open_array_section("custom-date"); | |
592 | for (auto i : custom_date) { | |
593 | /* | |
594 | * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc, | |
595 | * which will end up with failed sync | |
596 | */ | |
597 | real_time t; | |
598 | int r = parse_time(i.second.c_str(), &t); | |
599 | if (r < 0) { | |
600 | ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; | |
601 | continue; | |
602 | } | |
603 | ||
604 | string time_str; | |
605 | rgw_to_iso8601(t, &time_str); | |
606 | ||
607 | f->open_object_section("entity"); | |
608 | ::encode_json("name", i.first.c_str(), f); | |
609 | ::encode_json("value", time_str.c_str(), f); | |
610 | f->close_section(); | |
7c673cae FG |
611 | } |
612 | f->close_section(); | |
613 | } | |
224ce89b WB |
614 | f->close_section(); // meta |
615 | const auto& m = obj_tags.get_tags(); | |
616 | if (m.size() > 0){ | |
617 | f->open_array_section("tagging"); | |
618 | for (const auto &it : m) { | |
619 | f->open_object_section("tag"); | |
620 | ::encode_json("key", it.first, f); | |
621 | ::encode_json("value",it.second, f); | |
622 | f->close_section(); | |
623 | } | |
624 | f->close_section(); // tagging | |
625 | } | |
7c673cae | 626 | } |
31f18b77 FG |
627 | }; |
628 | ||
629 | class RGWElasticInitConfigCBCR : public RGWCoroutine { | |
630 | RGWDataSyncEnv *sync_env; | |
631 | ElasticConfigRef conf; | |
a8e16298 TL |
632 | ESInfo es_info; |
633 | ||
634 | struct _err_response { | |
635 | struct err_reason { | |
636 | vector<err_reason> root_cause; | |
637 | string type; | |
638 | string reason; | |
639 | string index; | |
640 | ||
641 | void decode_json(JSONObj *obj) { | |
642 | JSONDecoder::decode_json("root_cause", root_cause, obj); | |
643 | JSONDecoder::decode_json("type", type, obj); | |
644 | JSONDecoder::decode_json("reason", reason, obj); | |
645 | JSONDecoder::decode_json("index", index, obj); | |
646 | } | |
647 | } error; | |
648 | ||
649 | void decode_json(JSONObj *obj) { | |
650 | JSONDecoder::decode_json("error", error, obj); | |
651 | } | |
652 | } err_response; | |
653 | ||
31f18b77 FG |
654 | public: |
655 | RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, | |
656 | ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), | |
657 | sync_env(_sync_env), | |
658 | conf(_conf) {} | |
659 | int operate() override { | |
660 | reenter(this) { | |
661 | ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; | |
a8e16298 TL |
662 | yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct, |
663 | conf->conn.get(), | |
664 | sync_env->http_manager, | |
665 | "/", nullptr /*params*/, | |
666 | &(conf->default_headers), | |
667 | &es_info)); | |
668 | if (retcode < 0) { | |
669 | return set_cr_error(retcode); | |
670 | } | |
671 | ||
31f18b77 FG |
672 | yield { |
673 | string path = conf->get_index_path(); | |
a8e16298 | 674 | ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl; |
31f18b77 FG |
675 | |
676 | es_index_settings settings(conf->num_replicas, conf->num_shards); | |
31f18b77 | 677 | |
a8e16298 | 678 | std::unique_ptr<es_index_config_base> index_conf; |
31f18b77 | 679 | |
a8e16298 TL |
680 | if (es_info.version >= ES_V5) { |
681 | ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; | |
682 | index_conf.reset(new es_index_config<es_type_v5>(settings)); | |
683 | } else { | |
684 | ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; | |
685 | index_conf.reset(new es_index_config<es_type_v2>(settings)); | |
686 | } | |
687 | call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct, | |
688 | conf->conn.get(), | |
689 | sync_env->http_manager, | |
690 | path, nullptr /*params*/, | |
691 | &(conf->default_headers), | |
692 | *index_conf, nullptr, &err_response)); | |
31f18b77 FG |
693 | } |
694 | if (retcode < 0) { | |
a8e16298 TL |
695 | ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl; |
696 | ||
697 | if (err_response.error.type != "index_already_exists_exception") { | |
698 | return set_cr_error(retcode); | |
699 | } | |
700 | ||
701 | ldout(sync_env->cct, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl; | |
31f18b77 FG |
702 | } |
703 | return set_cr_done(); | |
704 | } | |
705 | return 0; | |
706 | } | |
7c673cae FG |
707 | |
708 | }; | |
709 | ||
710 | class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { | |
31f18b77 FG |
711 | ElasticConfigRef conf; |
712 | uint64_t versioned_epoch; | |
7c673cae FG |
713 | public: |
714 | RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
715 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
31f18b77 FG |
716 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), |
717 | versioned_epoch(_versioned_epoch) {} | |
7c673cae FG |
718 | int operate() override { |
719 | reenter(this) { | |
31f18b77 | 720 | ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone |
a8e16298 TL |
721 | << " b=" << bucket_info.bucket << " k=" << key |
722 | << " size=" << size << " mtime=" << mtime << dendl; | |
723 | ||
7c673cae | 724 | yield { |
31f18b77 FG |
725 | string path = conf->get_obj_path(bucket_info, key); |
726 | es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); | |
7c673cae | 727 | |
31f18b77 | 728 | call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(), |
7c673cae FG |
729 | sync_env->http_manager, |
730 | path, nullptr /* params */, | |
a8e16298 | 731 | &(conf->default_headers), |
7c673cae FG |
732 | doc, nullptr /* result */)); |
733 | ||
734 | } | |
735 | if (retcode < 0) { | |
736 | return set_cr_error(retcode); | |
737 | } | |
738 | return set_cr_done(); | |
739 | } | |
740 | return 0; | |
741 | } | |
7c673cae FG |
742 | }; |
743 | ||
744 | class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
31f18b77 FG |
745 | ElasticConfigRef conf; |
746 | uint64_t versioned_epoch; | |
7c673cae FG |
747 | public: |
748 | RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, | |
749 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
31f18b77 FG |
750 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), |
751 | conf(_conf), versioned_epoch(_versioned_epoch) { | |
7c673cae FG |
752 | } |
753 | ||
754 | ~RGWElasticHandleRemoteObjCR() override {} | |
755 | ||
756 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
31f18b77 | 757 | return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); |
7c673cae FG |
758 | } |
759 | }; | |
760 | ||
761 | class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { | |
762 | RGWDataSyncEnv *sync_env; | |
763 | RGWBucketInfo bucket_info; | |
764 | rgw_obj_key key; | |
765 | ceph::real_time mtime; | |
31f18b77 | 766 | ElasticConfigRef conf; |
7c673cae FG |
767 | public: |
768 | RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
769 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, | |
31f18b77 | 770 | ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), |
7c673cae FG |
771 | bucket_info(_bucket_info), key(_key), |
772 | mtime(_mtime), conf(_conf) {} | |
773 | int operate() override { | |
774 | reenter(this) { | |
31f18b77 FG |
775 | ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone |
776 | << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; | |
7c673cae | 777 | yield { |
31f18b77 | 778 | string path = conf->get_obj_path(bucket_info, key); |
7c673cae | 779 | |
31f18b77 | 780 | call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), |
7c673cae FG |
781 | sync_env->http_manager, |
782 | path, nullptr /* params */)); | |
783 | } | |
784 | if (retcode < 0) { | |
785 | return set_cr_error(retcode); | |
786 | } | |
787 | return set_cr_done(); | |
788 | } | |
789 | return 0; | |
790 | } | |
791 | ||
792 | }; | |
793 | ||
794 | class RGWElasticDataSyncModule : public RGWDataSyncModule { | |
31f18b77 | 795 | ElasticConfigRef conf; |
7c673cae | 796 | public: |
11fdf7f2 | 797 | RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) { |
31f18b77 | 798 | conf->init(cct, config); |
7c673cae | 799 | } |
31f18b77 FG |
800 | ~RGWElasticDataSyncModule() override {} |
801 | ||
802 | void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { | |
11fdf7f2 | 803 | conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id); |
7c673cae FG |
804 | } |
805 | ||
31f18b77 FG |
806 | RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { |
807 | ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; | |
808 | return new RGWElasticInitConfigCBCR(sync_env, conf); | |
809 | } | |
11fdf7f2 | 810 | RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override { |
91327a77 | 811 | ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; |
31f18b77 FG |
812 | if (!conf->should_handle_operation(bucket_info)) { |
813 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
814 | return nullptr; | |
815 | } | |
91327a77 | 816 | return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0)); |
7c673cae | 817 | } |
31f18b77 | 818 | RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
7c673cae | 819 | /* versioned and versioned epoch params are useless in the elasticsearch backend case */ |
31f18b77 FG |
820 | ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
821 | if (!conf->should_handle_operation(bucket_info)) { | |
822 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
823 | return nullptr; | |
824 | } | |
7c673cae FG |
825 | return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); |
826 | } | |
827 | RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, | |
31f18b77 FG |
828 | rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
829 | ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime | |
7c673cae | 830 | << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
31f18b77 | 831 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; |
7c673cae FG |
832 | return NULL; |
833 | } | |
31f18b77 FG |
834 | RGWRESTConn *get_rest_conn() { |
835 | return conf->conn.get(); | |
836 | } | |
7c673cae | 837 | |
31f18b77 FG |
838 | string get_index_path() { |
839 | return conf->get_index_path(); | |
7c673cae | 840 | } |
a8e16298 TL |
841 | |
842 | map<string, string>& get_request_headers() { | |
843 | return conf->get_request_headers(); | |
844 | } | |
7c673cae FG |
845 | }; |
846 | ||
11fdf7f2 | 847 | RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config) |
31f18b77 FG |
848 | { |
849 | data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config)); | |
850 | } | |
851 | ||
852 | RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() | |
853 | { | |
854 | return data_handler.get(); | |
855 | } | |
856 | ||
857 | RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() | |
858 | { | |
859 | return data_handler->get_rest_conn(); | |
860 | } | |
861 | ||
862 | string RGWElasticSyncModuleInstance::get_index_path() { | |
863 | return data_handler->get_index_path(); | |
864 | } | |
865 | ||
a8e16298 TL |
866 | map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() { |
867 | return data_handler->get_request_headers(); | |
868 | } | |
869 | ||
31f18b77 FG |
870 | RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { |
871 | if (dialect != RGW_REST_S3) { | |
872 | return orig; | |
873 | } | |
874 | delete orig; | |
875 | return new RGWRESTMgr_MDSearch_S3(); | |
876 | } | |
877 | ||
11fdf7f2 TL |
878 | int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { |
879 | string endpoint = config["endpoint"]; | |
31f18b77 | 880 | instance->reset(new RGWElasticSyncModuleInstance(cct, config)); |
7c673cae FG |
881 | return 0; |
882 | } | |
883 |