]>
Commit | Line | Data |
---|---|---|
a8e16298 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
a8e16298 TL |
3 | |
4 | #include "rgw_b64.h" | |
7c673cae FG |
5 | #include "rgw_common.h" |
6 | #include "rgw_coroutine.h" | |
7 | #include "rgw_sync_module.h" | |
8 | #include "rgw_data_sync.h" | |
7c673cae | 9 | #include "rgw_sync_module_es.h" |
31f18b77 | 10 | #include "rgw_sync_module_es_rest.h" |
7c673cae FG |
11 | #include "rgw_rest_conn.h" |
12 | #include "rgw_cr_rest.h" | |
31f18b77 FG |
13 | #include "rgw_op.h" |
14 | #include "rgw_es_query.h" | |
11fdf7f2 TL |
15 | #include "rgw_zone.h" |
16 | ||
17 | #include "services/svc_zone.h" | |
31f18b77 FG |
18 | |
19 | #include "include/str_list.h" | |
20 | ||
21 | #include <boost/asio/yield.hpp> | |
7c673cae FG |
22 | |
23 | #define dout_subsys ceph_subsys_rgw | |
24 | ||
31f18b77 FG |
25 | |
26 | /* | |
27 | * whitelist utility. Config string is a list of entries, where an entry is either an item, | |
28 | * a prefix, or a suffix. An item would be the name of the entity that we'd look up, | |
29 | * a prefix would be a string ending with an asterisk, a suffix would be a string starting | |
30 | * with an asterisk. For example: | |
31 | * | |
32 | * bucket1, bucket2, foo*, *bar | |
33 | */ | |
34 | class ItemList { | |
35 | bool approve_all{false}; | |
36 | ||
37 | set<string> entries; | |
38 | set<string> prefixes; | |
39 | set<string> suffixes; | |
40 | ||
41 | void parse(const string& str) { | |
42 | list<string> l; | |
43 | ||
44 | get_str_list(str, ",", l); | |
45 | ||
46 | for (auto& entry : l) { | |
47 | entry = rgw_trim_whitespace(entry); | |
48 | if (entry.empty()) { | |
49 | continue; | |
50 | } | |
51 | ||
52 | if (entry == "*") { | |
53 | approve_all = true; | |
54 | return; | |
55 | } | |
56 | ||
57 | if (entry[0] == '*') { | |
58 | suffixes.insert(entry.substr(1)); | |
59 | continue; | |
60 | } | |
61 | ||
62 | if (entry.back() == '*') { | |
63 | prefixes.insert(entry.substr(0, entry.size() - 1)); | |
64 | continue; | |
65 | } | |
66 | ||
67 | entries.insert(entry); | |
68 | } | |
69 | } | |
70 | ||
71 | public: | |
72 | ItemList() {} | |
73 | void init(const string& str, bool def_val) { | |
74 | if (str.empty()) { | |
75 | approve_all = def_val; | |
76 | } else { | |
77 | parse(str); | |
78 | } | |
79 | } | |
80 | ||
81 | bool exists(const string& entry) { | |
82 | if (approve_all) { | |
83 | return true; | |
84 | } | |
85 | ||
86 | if (entries.find(entry) != entries.end()) { | |
87 | return true; | |
88 | } | |
89 | ||
90 | auto i = prefixes.upper_bound(entry); | |
91 | if (i != prefixes.begin()) { | |
92 | --i; | |
93 | if (boost::algorithm::starts_with(entry, *i)) { | |
94 | return true; | |
95 | } | |
96 | } | |
97 | ||
98 | for (i = suffixes.begin(); i != suffixes.end(); ++i) { | |
99 | if (boost::algorithm::ends_with(entry, *i)) { | |
100 | return true; | |
101 | } | |
102 | } | |
103 | ||
104 | return false; | |
105 | } | |
106 | }; | |
107 | ||
108 | #define ES_NUM_SHARDS_MIN 5 | |
109 | ||
110 | #define ES_NUM_SHARDS_DEFAULT 16 | |
111 | #define ES_NUM_REPLICAS_DEFAULT 1 | |
112 | ||
a8e16298 TL |
113 | using ESVersion = std::pair<int,int>; |
114 | static constexpr ESVersion ES_V5{5,0}; | |
eafe8130 | 115 | static constexpr ESVersion ES_V7{7,0}; |
a8e16298 TL |
116 | |
117 | struct ESInfo { | |
118 | std::string name; | |
119 | std::string cluster_name; | |
120 | std::string cluster_uuid; | |
121 | ESVersion version; | |
122 | ||
123 | void decode_json(JSONObj *obj); | |
124 | ||
125 | std::string get_version_str(){ | |
126 | return std::to_string(version.first) + "." + std::to_string(version.second); | |
127 | } | |
128 | }; | |
129 | ||
130 | // simple wrapper structure to wrap the es version nested type | |
131 | struct es_version_decoder { | |
132 | ESVersion version; | |
133 | ||
134 | int parse_version(const std::string& s) { | |
135 | int major, minor; | |
136 | int ret = sscanf(s.c_str(), "%d.%d", &major, &minor); | |
137 | if (ret < 0) { | |
138 | return ret; | |
139 | } | |
140 | version = std::make_pair(major,minor); | |
141 | return 0; | |
142 | } | |
143 | ||
144 | void decode_json(JSONObj *obj) { | |
145 | std::string s; | |
146 | JSONDecoder::decode_json("number",s,obj); | |
147 | if (parse_version(s) < 0) | |
148 | throw JSONDecoder::err("Failed to parse ElasticVersion"); | |
149 | } | |
150 | }; | |
151 | ||
152 | ||
153 | void ESInfo::decode_json(JSONObj *obj) | |
154 | { | |
155 | JSONDecoder::decode_json("name", name, obj); | |
156 | JSONDecoder::decode_json("cluster_name", cluster_name, obj); | |
157 | JSONDecoder::decode_json("cluster_uuid", cluster_uuid, obj); | |
158 | es_version_decoder esv; | |
159 | JSONDecoder::decode_json("version", esv, obj); | |
160 | version = std::move(esv.version); | |
161 | } | |
162 | ||
7c673cae | 163 | struct ElasticConfig { |
31f18b77 | 164 | uint64_t sync_instance{0}; |
7c673cae | 165 | string id; |
31f18b77 FG |
166 | string index_path; |
167 | std::unique_ptr<RGWRESTConn> conn; | |
168 | bool explicit_custom_meta{true}; | |
169 | string override_index_path; | |
170 | ItemList index_buckets; | |
171 | ItemList allow_owners; | |
172 | uint32_t num_shards{0}; | |
173 | uint32_t num_replicas{0}; | |
a8e16298 | 174 | std::map <string,string> default_headers = {{ "Content-Type", "application/json" }}; |
eafe8130 | 175 | ESInfo es_info; |
31f18b77 | 176 | |
11fdf7f2 TL |
177 | void init(CephContext *cct, const JSONFormattable& config) { |
178 | string elastic_endpoint = config["endpoint"]; | |
31f18b77 FG |
179 | id = string("elastic:") + elastic_endpoint; |
180 | conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint })); | |
11fdf7f2 TL |
181 | explicit_custom_meta = config["explicit_custom_meta"](true); |
182 | index_buckets.init(config["index_buckets_list"], true); /* approve all buckets by default */ | |
183 | allow_owners.init(config["approved_owners_list"], true); /* approve all bucket owners by default */ | |
184 | override_index_path = config["override_index_path"]; | |
185 | num_shards = config["num_shards"](ES_NUM_SHARDS_DEFAULT); | |
31f18b77 FG |
186 | if (num_shards < ES_NUM_SHARDS_MIN) { |
187 | num_shards = ES_NUM_SHARDS_MIN; | |
188 | } | |
11fdf7f2 TL |
189 | num_replicas = config["num_replicas"](ES_NUM_REPLICAS_DEFAULT); |
190 | if (string user = config["username"], pw = config["password"]; | |
191 | !user.empty() && !pw.empty()) { | |
a8e16298 TL |
192 | auto auth_string = user + ":" + pw; |
193 | default_headers.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string)); | |
194 | } | |
11fdf7f2 | 195 | |
31f18b77 FG |
196 | } |
197 | ||
11fdf7f2 | 198 | void init_instance(const RGWRealm& realm, uint64_t instance_id) { |
31f18b77 FG |
199 | sync_instance = instance_id; |
200 | ||
201 | if (!override_index_path.empty()) { | |
202 | index_path = override_index_path; | |
203 | return; | |
204 | } | |
205 | ||
206 | char buf[32]; | |
207 | snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF)); | |
208 | ||
209 | index_path = "/rgw-" + realm.get_name() + buf; | |
210 | } | |
211 | ||
212 | string get_index_path() { | |
213 | return index_path; | |
214 | } | |
215 | ||
a8e16298 TL |
216 | map<string, string>& get_request_headers() { |
217 | return default_headers; | |
218 | } | |
219 | ||
31f18b77 | 220 | string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { |
eafe8130 TL |
221 | if (es_info.version >= ES_V7) { |
222 | return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); | |
223 | ; | |
224 | } else { | |
225 | return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); | |
226 | } | |
31f18b77 FG |
227 | } |
228 | ||
229 | bool should_handle_operation(RGWBucketInfo& bucket_info) { | |
230 | return index_buckets.exists(bucket_info.bucket.name) && | |
231 | allow_owners.exists(bucket_info.owner.to_str()); | |
232 | } | |
7c673cae FG |
233 | }; |
234 | ||
31f18b77 FG |
235 | using ElasticConfigRef = std::shared_ptr<ElasticConfig>; |
236 | ||
a8e16298 TL |
237 | static const char *es_type_to_str(const ESType& t) { |
238 | switch (t) { | |
239 | case ESType::String: return "string"; | |
240 | case ESType::Text: return "text"; | |
241 | case ESType::Keyword: return "keyword"; | |
242 | case ESType::Long: return "long"; | |
243 | case ESType::Integer: return "integer"; | |
244 | case ESType::Short: return "short"; | |
245 | case ESType::Byte: return "byte"; | |
246 | case ESType::Double: return "double"; | |
247 | case ESType::Float: return "float"; | |
248 | case ESType::Half_Float: return "half_float"; | |
249 | case ESType::Scaled_Float: return "scaled_float"; | |
250 | case ESType::Date: return "date"; | |
251 | case ESType::Boolean: return "boolean"; | |
252 | case ESType::Integer_Range: return "integer_range"; | |
253 | case ESType::Float_Range: return "float_range"; | |
254 | case ESType::Double_Range: return "date_range"; | |
255 | case ESType::Date_Range: return "date_range"; | |
256 | case ESType::Geo_Point: return "geo_point"; | |
257 | case ESType::Ip: return "ip"; | |
258 | default: | |
259 | return "<unknown>"; | |
260 | } | |
261 | } | |
262 | ||
263 | struct es_type_v2 { | |
264 | ESType estype; | |
265 | const char *format{nullptr}; | |
11fdf7f2 | 266 | std::optional<bool> analyzed; |
a8e16298 TL |
267 | |
268 | es_type_v2(ESType et) : estype(et) {} | |
269 | ||
270 | void dump(Formatter *f) const { | |
271 | const char *type_str = es_type_to_str(estype); | |
272 | encode_json("type", type_str, f); | |
273 | if (format) { | |
274 | encode_json("format", format, f); | |
275 | } | |
276 | ||
277 | auto is_analyzed = analyzed; | |
278 | ||
279 | if (estype == ESType::String && | |
280 | !is_analyzed) { | |
281 | is_analyzed = false; | |
282 | } | |
283 | ||
284 | if (is_analyzed) { | |
285 | encode_json("index", (is_analyzed.value() ? "analyzed" : "not_analyzed"), f); | |
286 | } | |
287 | } | |
288 | }; | |
31f18b77 | 289 | |
a8e16298 TL |
290 | struct es_type_v5 { |
291 | ESType estype; | |
292 | const char *format{nullptr}; | |
11fdf7f2 TL |
293 | std::optional<bool> analyzed; |
294 | std::optional<bool> index; | |
a8e16298 TL |
295 | |
296 | es_type_v5(ESType et) : estype(et) {} | |
31f18b77 FG |
297 | |
298 | void dump(Formatter *f) const { | |
a8e16298 TL |
299 | ESType new_estype; |
300 | if (estype != ESType::String) { | |
301 | new_estype = estype; | |
302 | } else { | |
303 | bool is_analyzed = analyzed.value_or(false); | |
304 | new_estype = (is_analyzed ? ESType::Text : ESType::Keyword); | |
305 | /* index = true; ... Not setting index=true, because that's the default, | |
306 | * and dumping a boolean value *might* be a problem when backporting this | |
307 | * because value might get quoted | |
308 | */ | |
309 | } | |
310 | ||
311 | const char *type_str = es_type_to_str(new_estype); | |
312 | encode_json("type", type_str, f); | |
31f18b77 FG |
313 | if (format) { |
314 | encode_json("format", format, f); | |
315 | } | |
a8e16298 TL |
316 | if (index) { |
317 | encode_json("index", index.value(), f); | |
31f18b77 FG |
318 | } |
319 | } | |
320 | }; | |
321 | ||
a8e16298 TL |
322 | template <class T> |
323 | struct es_type : public T { | |
324 | es_type(T t) : T(t) {} | |
325 | es_type& set_format(const char *f) { | |
326 | T::format = f; | |
327 | return *this; | |
328 | } | |
329 | ||
330 | es_type& set_analyzed(bool a) { | |
331 | T::analyzed = a; | |
332 | return *this; | |
333 | } | |
334 | }; | |
335 | ||
336 | template <class T> | |
31f18b77 | 337 | struct es_index_mappings { |
eafe8130 | 338 | ESVersion es_version; |
a8e16298 TL |
339 | ESType string_type {ESType::String}; |
340 | ||
eafe8130 TL |
341 | es_index_mappings(ESVersion esv):es_version(esv) { |
342 | } | |
343 | ||
a8e16298 TL |
344 | es_type<T> est(ESType t) const { |
345 | return es_type<T>(t); | |
346 | } | |
347 | ||
348 | void dump_custom(const char *section, ESType type, const char *format, Formatter *f) const { | |
31f18b77 FG |
349 | f->open_object_section(section); |
350 | ::encode_json("type", "nested", f); | |
351 | f->open_object_section("properties"); | |
a8e16298 TL |
352 | encode_json("name", est(string_type), f); |
353 | encode_json("value", est(type).set_format(format), f); | |
31f18b77 FG |
354 | f->close_section(); // entry |
355 | f->close_section(); // custom-string | |
356 | } | |
a8e16298 | 357 | |
31f18b77 | 358 | void dump(Formatter *f) const { |
eafe8130 TL |
359 | if (es_version <= ES_V7) |
360 | f->open_object_section("object"); | |
31f18b77 | 361 | f->open_object_section("properties"); |
a8e16298 TL |
362 | encode_json("bucket", est(string_type), f); |
363 | encode_json("name", est(string_type), f); | |
364 | encode_json("instance", est(string_type), f); | |
365 | encode_json("versioned_epoch", est(ESType::Long), f); | |
31f18b77 FG |
366 | f->open_object_section("meta"); |
367 | f->open_object_section("properties"); | |
a8e16298 TL |
368 | encode_json("cache_control", est(string_type), f); |
369 | encode_json("content_disposition", est(string_type), f); | |
370 | encode_json("content_encoding", est(string_type), f); | |
371 | encode_json("content_language", est(string_type), f); | |
372 | encode_json("content_type", est(string_type), f); | |
373 | encode_json("storage_class", est(string_type), f); | |
374 | encode_json("etag", est(string_type), f); | |
375 | encode_json("expires", est(string_type), f); | |
376 | encode_json("mtime", est(ESType::Date) | |
377 | .set_format("strict_date_optional_time||epoch_millis"), f); | |
378 | encode_json("size", est(ESType::Long), f); | |
379 | dump_custom("custom-string", string_type, nullptr, f); | |
380 | dump_custom("custom-int", ESType::Long, nullptr, f); | |
381 | dump_custom("custom-date", ESType::Date, "strict_date_optional_time||epoch_millis", f); | |
31f18b77 FG |
382 | f->close_section(); // properties |
383 | f->close_section(); // meta | |
384 | f->close_section(); // properties | |
eafe8130 TL |
385 | |
386 | if (es_version <= ES_V7) | |
31f18b77 FG |
387 | f->close_section(); // object |
388 | } | |
389 | }; | |
390 | ||
391 | struct es_index_settings { | |
392 | uint32_t num_replicas; | |
393 | uint32_t num_shards; | |
394 | ||
395 | es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {} | |
396 | ||
397 | void dump(Formatter *f) const { | |
398 | encode_json("number_of_replicas", num_replicas, f); | |
399 | encode_json("number_of_shards", num_shards, f); | |
400 | } | |
401 | }; | |
402 | ||
a8e16298 TL |
403 | struct es_index_config_base { |
404 | virtual ~es_index_config_base() {} | |
405 | virtual void dump(Formatter *f) const = 0; | |
406 | }; | |
407 | ||
408 | template <class T> | |
409 | struct es_index_config : public es_index_config_base { | |
31f18b77 | 410 | es_index_settings settings; |
a8e16298 | 411 | es_index_mappings<T> mappings; |
31f18b77 | 412 | |
eafe8130 TL |
413 | es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) { |
414 | } | |
31f18b77 FG |
415 | |
416 | void dump(Formatter *f) const { | |
417 | encode_json("settings", settings, f); | |
418 | encode_json("mappings", mappings, f); | |
419 | } | |
420 | }; | |
7c673cae | 421 | |
f64942e4 | 422 | static bool is_sys_attr(const std::string& attr_name){ |
11fdf7f2 TL |
423 | static constexpr std::initializer_list<const char*> rgw_sys_attrs = |
424 | {RGW_ATTR_PG_VER, | |
425 | RGW_ATTR_SOURCE_ZONE, | |
426 | RGW_ATTR_ID_TAG, | |
427 | RGW_ATTR_TEMPURL_KEY1, | |
428 | RGW_ATTR_TEMPURL_KEY2, | |
429 | RGW_ATTR_UNIX1, | |
430 | RGW_ATTR_UNIX_KEY1 | |
f64942e4 AA |
431 | }; |
432 | ||
433 | return std::find(rgw_sys_attrs.begin(), rgw_sys_attrs.end(), attr_name) != rgw_sys_attrs.end(); | |
434 | } | |
435 | ||
a8e16298 TL |
436 | static size_t attr_len(const bufferlist& val) |
437 | { | |
438 | size_t len = val.length(); | |
439 | if (len && val[len - 1] == '\0') { | |
440 | --len; | |
441 | } | |
442 | ||
443 | return len; | |
444 | } | |
445 | ||
7c673cae FG |
446 | struct es_obj_metadata { |
447 | CephContext *cct; | |
31f18b77 | 448 | ElasticConfigRef es_conf; |
7c673cae FG |
449 | RGWBucketInfo bucket_info; |
450 | rgw_obj_key key; | |
451 | ceph::real_time mtime; | |
452 | uint64_t size; | |
453 | map<string, bufferlist> attrs; | |
31f18b77 | 454 | uint64_t versioned_epoch; |
7c673cae | 455 | |
31f18b77 | 456 | es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, |
7c673cae | 457 | const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, |
31f18b77 FG |
458 | map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), |
459 | mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {} | |
7c673cae FG |
460 | |
461 | void dump(Formatter *f) const { | |
462 | map<string, string> out_attrs; | |
463 | map<string, string> custom_meta; | |
464 | RGWAccessControlPolicy policy; | |
465 | set<string> permissions; | |
224ce89b | 466 | RGWObjTags obj_tags; |
7c673cae FG |
467 | |
468 | for (auto i : attrs) { | |
469 | const string& attr_name = i.first; | |
7c673cae FG |
470 | bufferlist& val = i.second; |
471 | ||
a8e16298 | 472 | if (!boost::algorithm::starts_with(attr_name, RGW_ATTR_PREFIX)) { |
7c673cae FG |
473 | continue; |
474 | } | |
475 | ||
a8e16298 | 476 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_META_PREFIX)) { |
f64942e4 | 477 | custom_meta.emplace(attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1), |
a8e16298 TL |
478 | string(val.c_str(), attr_len(val))); |
479 | continue; | |
480 | } | |
481 | ||
482 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)) { | |
7c673cae FG |
483 | continue; |
484 | } | |
485 | ||
a8e16298 TL |
486 | if (boost::algorithm::starts_with(attr_name, RGW_ATTR_OLH_PREFIX)) { |
487 | // skip versioned object olh info | |
f64942e4 AA |
488 | continue; |
489 | } | |
7c673cae | 490 | |
f64942e4 | 491 | if (attr_name == RGW_ATTR_ACL) { |
7c673cae | 492 | try { |
11fdf7f2 TL |
493 | auto i = val.cbegin(); |
494 | decode(policy, i); | |
7c673cae FG |
495 | } catch (buffer::error& err) { |
496 | ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl; | |
f64942e4 | 497 | continue; |
7c673cae FG |
498 | } |
499 | ||
500 | const RGWAccessControlList& acl = policy.get_acl(); | |
501 | ||
502 | permissions.insert(policy.get_owner().get_id().to_str()); | |
503 | for (auto acliter : acl.get_grant_map()) { | |
504 | const ACLGrant& grant = acliter.second; | |
505 | if (grant.get_type().get_type() == ACL_TYPE_CANON_USER && | |
506 | ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) { | |
507 | rgw_user user; | |
508 | if (grant.get_id(user)) { | |
509 | permissions.insert(user.to_str()); | |
510 | } | |
511 | } | |
512 | } | |
f64942e4 AA |
513 | } else if (attr_name == RGW_ATTR_TAGS) { |
514 | try { | |
11fdf7f2 TL |
515 | auto tags_bl = val.cbegin(); |
516 | decode(obj_tags, tags_bl); | |
f64942e4 AA |
517 | } catch (buffer::error& err) { |
518 | ldout(cct,0) << "ERROR: failed to decode obj tags for " | |
519 | << bucket_info.bucket << "/" << key << dendl; | |
520 | continue; | |
521 | } | |
522 | } else if (attr_name == RGW_ATTR_COMPRESSION) { | |
28e407b8 | 523 | RGWCompressionInfo cs_info; |
f64942e4 | 524 | try { |
11fdf7f2 TL |
525 | auto vals_bl = val.cbegin(); |
526 | decode(cs_info, vals_bl); | |
f64942e4 AA |
527 | } catch (buffer::error& err) { |
528 | ldout(cct,0) << "ERROR: failed to decode compression attr for " | |
529 | << bucket_info.bucket << "/" << key << dendl; | |
530 | continue; | |
531 | } | |
532 | out_attrs.emplace("compression",std::move(cs_info.compression_type)); | |
7c673cae | 533 | } else { |
f64942e4 AA |
534 | if (!is_sys_attr(attr_name)) { |
535 | out_attrs.emplace(attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1), | |
a8e16298 | 536 | std::string(val.c_str(), attr_len(val))); |
7c673cae FG |
537 | } |
538 | } | |
539 | } | |
540 | ::encode_json("bucket", bucket_info.bucket.name, f); | |
541 | ::encode_json("name", key.name, f); | |
a8e16298 TL |
542 | string instance = key.instance; |
543 | if (instance.empty()) | |
544 | instance = "null"; | |
545 | ::encode_json("instance", instance, f); | |
31f18b77 | 546 | ::encode_json("versioned_epoch", versioned_epoch, f); |
7c673cae FG |
547 | ::encode_json("owner", policy.get_owner(), f); |
548 | ::encode_json("permissions", permissions, f); | |
549 | f->open_object_section("meta"); | |
550 | ::encode_json("size", size, f); | |
551 | ||
552 | string mtime_str; | |
553 | rgw_to_iso8601(mtime, &mtime_str); | |
554 | ::encode_json("mtime", mtime_str, f); | |
555 | for (auto i : out_attrs) { | |
556 | ::encode_json(i.first.c_str(), i.second, f); | |
557 | } | |
31f18b77 FG |
558 | map<string, string> custom_str; |
559 | map<string, string> custom_int; | |
560 | map<string, string> custom_date; | |
561 | ||
562 | for (auto i : custom_meta) { | |
563 | auto config = bucket_info.mdsearch_config.find(i.first); | |
564 | if (config == bucket_info.mdsearch_config.end()) { | |
565 | if (!es_conf->explicit_custom_meta) { | |
566 | /* default custom meta is of type string */ | |
567 | custom_str[i.first] = i.second; | |
568 | } else { | |
569 | ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; | |
570 | } | |
571 | continue; | |
572 | } | |
573 | switch (config->second) { | |
574 | case ESEntityTypeMap::ES_ENTITY_DATE: | |
575 | custom_date[i.first] = i.second; | |
576 | break; | |
577 | case ESEntityTypeMap::ES_ENTITY_INT: | |
578 | custom_int[i.first] = i.second; | |
579 | break; | |
580 | default: | |
581 | custom_str[i.first] = i.second; | |
582 | } | |
583 | } | |
584 | ||
585 | if (!custom_str.empty()) { | |
586 | f->open_array_section("custom-string"); | |
587 | for (auto i : custom_str) { | |
588 | f->open_object_section("entity"); | |
589 | ::encode_json("name", i.first.c_str(), f); | |
590 | ::encode_json("value", i.second, f); | |
591 | f->close_section(); | |
592 | } | |
593 | f->close_section(); | |
594 | } | |
595 | if (!custom_int.empty()) { | |
596 | f->open_array_section("custom-int"); | |
597 | for (auto i : custom_int) { | |
598 | f->open_object_section("entity"); | |
599 | ::encode_json("name", i.first.c_str(), f); | |
600 | ::encode_json("value", i.second, f); | |
601 | f->close_section(); | |
602 | } | |
603 | f->close_section(); | |
604 | } | |
605 | if (!custom_date.empty()) { | |
606 | f->open_array_section("custom-date"); | |
607 | for (auto i : custom_date) { | |
608 | /* | |
609 | * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc, | |
610 | * which will end up with failed sync | |
611 | */ | |
612 | real_time t; | |
613 | int r = parse_time(i.second.c_str(), &t); | |
614 | if (r < 0) { | |
615 | ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; | |
616 | continue; | |
617 | } | |
618 | ||
619 | string time_str; | |
620 | rgw_to_iso8601(t, &time_str); | |
621 | ||
622 | f->open_object_section("entity"); | |
623 | ::encode_json("name", i.first.c_str(), f); | |
624 | ::encode_json("value", time_str.c_str(), f); | |
625 | f->close_section(); | |
7c673cae FG |
626 | } |
627 | f->close_section(); | |
628 | } | |
224ce89b WB |
629 | f->close_section(); // meta |
630 | const auto& m = obj_tags.get_tags(); | |
631 | if (m.size() > 0){ | |
632 | f->open_array_section("tagging"); | |
633 | for (const auto &it : m) { | |
634 | f->open_object_section("tag"); | |
635 | ::encode_json("key", it.first, f); | |
636 | ::encode_json("value",it.second, f); | |
637 | f->close_section(); | |
638 | } | |
639 | f->close_section(); // tagging | |
640 | } | |
7c673cae | 641 | } |
31f18b77 FG |
642 | }; |
643 | ||
9f95a23c | 644 | class RGWElasticGetESInfoCBCR : public RGWCoroutine { |
31f18b77 | 645 | public: |
9f95a23c TL |
646 | RGWElasticGetESInfoCBCR(RGWDataSyncCtx *_sc, |
647 | ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), | |
648 | sc(_sc), sync_env(_sc->env), | |
31f18b77 FG |
649 | conf(_conf) {} |
650 | int operate() override { | |
651 | reenter(this) { | |
9f95a23c | 652 | ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sc->source_zone << dendl; |
a8e16298 TL |
653 | yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct, |
654 | conf->conn.get(), | |
655 | sync_env->http_manager, | |
656 | "/", nullptr /*params*/, | |
657 | &(conf->default_headers), | |
9f95a23c | 658 | &(conf->es_info))); |
a8e16298 | 659 | if (retcode < 0) { |
9f95a23c | 660 | ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl; |
a8e16298 TL |
661 | return set_cr_error(retcode); |
662 | } | |
663 | ||
9f95a23c TL |
664 | ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl; |
665 | return set_cr_done(); | |
666 | } | |
667 | return 0; | |
668 | } | |
669 | private: | |
670 | RGWDataSyncCtx *sc; | |
671 | RGWDataSyncEnv *sync_env; | |
672 | ElasticConfigRef conf; | |
673 | }; | |
674 | ||
675 | class RGWElasticPutIndexCBCR : public RGWCoroutine { | |
676 | public: | |
677 | RGWElasticPutIndexCBCR(RGWDataSyncCtx *_sc, | |
678 | ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), | |
679 | sc(_sc), sync_env(_sc->env), | |
680 | conf(_conf) {} | |
681 | int operate() override { | |
682 | reenter(this) { | |
683 | ldout(sc->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sc->source_zone << dendl; | |
684 | ||
31f18b77 FG |
685 | yield { |
686 | string path = conf->get_index_path(); | |
31f18b77 | 687 | es_index_settings settings(conf->num_replicas, conf->num_shards); |
a8e16298 | 688 | std::unique_ptr<es_index_config_base> index_conf; |
31f18b77 | 689 | |
9f95a23c TL |
690 | if (conf->es_info.version >= ES_V5) { |
691 | ldout(sc->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; | |
692 | index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version)); | |
a8e16298 | 693 | } else { |
9f95a23c TL |
694 | ldout(sc->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; |
695 | index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version)); | |
a8e16298 | 696 | } |
9f95a23c | 697 | call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sc->cct, |
a8e16298 TL |
698 | conf->conn.get(), |
699 | sync_env->http_manager, | |
700 | path, nullptr /*params*/, | |
701 | &(conf->default_headers), | |
702 | *index_conf, nullptr, &err_response)); | |
31f18b77 FG |
703 | } |
704 | if (retcode < 0) { | |
a8e16298 | 705 | |
eafe8130 | 706 | if (err_response.error.type != "index_already_exists_exception" && |
9f95a23c TL |
707 | err_response.error.type != "resource_already_exists_exception") { |
708 | ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl; | |
a8e16298 TL |
709 | return set_cr_error(retcode); |
710 | } | |
711 | ||
712 | ldout(sync_env->cct, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl; | |
31f18b77 FG |
713 | } |
714 | return set_cr_done(); | |
715 | } | |
716 | return 0; | |
717 | } | |
7c673cae | 718 | |
9f95a23c TL |
719 | private: |
720 | RGWDataSyncCtx *sc; | |
721 | RGWDataSyncEnv *sync_env; | |
722 | ElasticConfigRef conf; | |
723 | ||
724 | struct _err_response { | |
725 | struct err_reason { | |
726 | vector<err_reason> root_cause; | |
727 | string type; | |
728 | string reason; | |
729 | string index; | |
730 | ||
731 | void decode_json(JSONObj *obj) { | |
732 | JSONDecoder::decode_json("root_cause", root_cause, obj); | |
733 | JSONDecoder::decode_json("type", type, obj); | |
734 | JSONDecoder::decode_json("reason", reason, obj); | |
735 | JSONDecoder::decode_json("index", index, obj); | |
736 | } | |
737 | } error; | |
738 | ||
739 | void decode_json(JSONObj *obj) { | |
740 | JSONDecoder::decode_json("error", error, obj); | |
741 | } | |
742 | } err_response; | |
743 | }; | |
744 | ||
745 | class RGWElasticInitConfigCBCR : public RGWCoroutine { | |
746 | RGWDataSyncCtx *sc; | |
747 | RGWDataSyncEnv *sync_env; | |
748 | ElasticConfigRef conf; | |
749 | ||
750 | public: | |
751 | RGWElasticInitConfigCBCR(RGWDataSyncCtx *_sc, | |
752 | ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), | |
753 | sc(_sc), sync_env(_sc->env), | |
754 | conf(_conf) {} | |
755 | int operate() override { | |
756 | reenter(this) { | |
757 | ||
758 | yield call(new RGWElasticGetESInfoCBCR(sc, conf)); | |
759 | ||
760 | if (retcode < 0) { | |
761 | return set_cr_error(retcode); | |
762 | } | |
763 | ||
764 | yield call(new RGWElasticPutIndexCBCR(sc, conf)); | |
765 | if (retcode < 0) { | |
766 | return set_cr_error(retcode); | |
767 | } | |
768 | return set_cr_done(); | |
769 | } | |
770 | return 0; | |
771 | } | |
772 | ||
7c673cae FG |
773 | }; |
774 | ||
775 | class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { | |
9f95a23c | 776 | rgw_bucket_sync_pipe sync_pipe; |
31f18b77 FG |
777 | ElasticConfigRef conf; |
778 | uint64_t versioned_epoch; | |
7c673cae | 779 | public: |
9f95a23c TL |
780 | RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, |
781 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
782 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), | |
783 | sync_pipe(_sync_pipe), conf(_conf), | |
31f18b77 | 784 | versioned_epoch(_versioned_epoch) {} |
7c673cae FG |
785 | int operate() override { |
786 | reenter(this) { | |
9f95a23c TL |
787 | ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone |
788 | << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key | |
a8e16298 TL |
789 | << " size=" << size << " mtime=" << mtime << dendl; |
790 | ||
7c673cae | 791 | yield { |
9f95a23c TL |
792 | string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key); |
793 | es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch); | |
7c673cae | 794 | |
31f18b77 | 795 | call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(), |
7c673cae FG |
796 | sync_env->http_manager, |
797 | path, nullptr /* params */, | |
a8e16298 | 798 | &(conf->default_headers), |
7c673cae FG |
799 | doc, nullptr /* result */)); |
800 | ||
801 | } | |
802 | if (retcode < 0) { | |
803 | return set_cr_error(retcode); | |
804 | } | |
805 | return set_cr_done(); | |
806 | } | |
807 | return 0; | |
808 | } | |
7c673cae FG |
809 | }; |
810 | ||
811 | class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
9f95a23c | 812 | rgw_bucket_sync_pipe sync_pipe; |
31f18b77 FG |
813 | ElasticConfigRef conf; |
814 | uint64_t versioned_epoch; | |
7c673cae | 815 | public: |
9f95a23c TL |
816 | RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc, |
817 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
818 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), | |
819 | sync_pipe(_sync_pipe), | |
31f18b77 | 820 | conf(_conf), versioned_epoch(_versioned_epoch) { |
7c673cae FG |
821 | } |
822 | ||
823 | ~RGWElasticHandleRemoteObjCR() override {} | |
824 | ||
825 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
9f95a23c | 826 | return new RGWElasticHandleRemoteObjCBCR(sc, sync_pipe, key, conf, versioned_epoch); |
7c673cae FG |
827 | } |
828 | }; | |
829 | ||
830 | class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { | |
9f95a23c | 831 | RGWDataSyncCtx *sc; |
7c673cae | 832 | RGWDataSyncEnv *sync_env; |
9f95a23c | 833 | rgw_bucket_sync_pipe sync_pipe; |
7c673cae FG |
834 | rgw_obj_key key; |
835 | ceph::real_time mtime; | |
31f18b77 | 836 | ElasticConfigRef conf; |
7c673cae | 837 | public: |
9f95a23c TL |
838 | RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc, |
839 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, | |
840 | ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), | |
841 | sync_pipe(_sync_pipe), key(_key), | |
7c673cae FG |
842 | mtime(_mtime), conf(_conf) {} |
843 | int operate() override { | |
844 | reenter(this) { | |
9f95a23c TL |
845 | ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone |
846 | << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl; | |
7c673cae | 847 | yield { |
9f95a23c | 848 | string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key); |
7c673cae | 849 | |
31f18b77 | 850 | call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), |
7c673cae FG |
851 | sync_env->http_manager, |
852 | path, nullptr /* params */)); | |
853 | } | |
854 | if (retcode < 0) { | |
855 | return set_cr_error(retcode); | |
856 | } | |
857 | return set_cr_done(); | |
858 | } | |
859 | return 0; | |
860 | } | |
861 | ||
862 | }; | |
863 | ||
864 | class RGWElasticDataSyncModule : public RGWDataSyncModule { | |
31f18b77 | 865 | ElasticConfigRef conf; |
7c673cae | 866 | public: |
11fdf7f2 | 867 | RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) { |
31f18b77 | 868 | conf->init(cct, config); |
7c673cae | 869 | } |
31f18b77 FG |
870 | ~RGWElasticDataSyncModule() override {} |
871 | ||
9f95a23c TL |
872 | void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { |
873 | conf->init_instance(sc->env->svc->zone->get_realm(), instance_id); | |
874 | } | |
875 | ||
876 | RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override { | |
877 | ldout(sc->cct, 5) << conf->id << ": init" << dendl; | |
878 | return new RGWElasticInitConfigCBCR(sc, conf); | |
7c673cae FG |
879 | } |
880 | ||
9f95a23c TL |
881 | RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { |
882 | ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl; | |
883 | // try to get elastic search version | |
884 | return new RGWElasticGetESInfoCBCR(sc, conf); | |
31f18b77 | 885 | } |
9f95a23c TL |
886 | |
887 | RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override { | |
888 | ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; | |
889 | if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { | |
890 | ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
31f18b77 FG |
891 | return nullptr; |
892 | } | |
9f95a23c | 893 | return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0)); |
7c673cae | 894 | } |
9f95a23c | 895 | RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
7c673cae | 896 | /* versioned and versioned epoch params are useless in the elasticsearch backend case */ |
9f95a23c TL |
897 | ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
898 | if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { | |
899 | ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
31f18b77 FG |
900 | return nullptr; |
901 | } | |
9f95a23c | 902 | return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf); |
7c673cae | 903 | } |
9f95a23c | 904 | RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, |
31f18b77 | 905 | rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 906 | ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime |
7c673cae | 907 | << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
9f95a23c | 908 | ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; |
7c673cae FG |
909 | return NULL; |
910 | } | |
31f18b77 FG |
911 | RGWRESTConn *get_rest_conn() { |
912 | return conf->conn.get(); | |
913 | } | |
7c673cae | 914 | |
31f18b77 FG |
915 | string get_index_path() { |
916 | return conf->get_index_path(); | |
7c673cae | 917 | } |
a8e16298 TL |
918 | |
919 | map<string, string>& get_request_headers() { | |
920 | return conf->get_request_headers(); | |
921 | } | |
7c673cae FG |
922 | }; |
923 | ||
11fdf7f2 | 924 | RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config) |
31f18b77 FG |
925 | { |
926 | data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config)); | |
927 | } | |
928 | ||
929 | RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() | |
930 | { | |
931 | return data_handler.get(); | |
932 | } | |
933 | ||
934 | RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() | |
935 | { | |
936 | return data_handler->get_rest_conn(); | |
937 | } | |
938 | ||
939 | string RGWElasticSyncModuleInstance::get_index_path() { | |
940 | return data_handler->get_index_path(); | |
941 | } | |
942 | ||
a8e16298 TL |
943 | map<string, string>& RGWElasticSyncModuleInstance::get_request_headers() { |
944 | return data_handler->get_request_headers(); | |
945 | } | |
946 | ||
31f18b77 FG |
947 | RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { |
948 | if (dialect != RGW_REST_S3) { | |
949 | return orig; | |
950 | } | |
951 | delete orig; | |
952 | return new RGWRESTMgr_MDSearch_S3(); | |
953 | } | |
954 | ||
11fdf7f2 TL |
955 | int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { |
956 | string endpoint = config["endpoint"]; | |
31f18b77 | 957 | instance->reset(new RGWElasticSyncModuleInstance(cct, config)); |
7c673cae FG |
958 | return 0; |
959 | } | |
960 |