]>
Commit | Line | Data |
---|---|---|
1 | #include "rgw_common.h" | |
2 | #include "rgw_coroutine.h" | |
3 | #include "rgw_sync_module.h" | |
4 | #include "rgw_data_sync.h" | |
5 | #include "rgw_sync_module_es.h" | |
6 | #include "rgw_sync_module_es_rest.h" | |
7 | #include "rgw_rest_conn.h" | |
8 | #include "rgw_cr_rest.h" | |
9 | #include "rgw_op.h" | |
10 | #include "rgw_es_query.h" | |
11 | ||
12 | #include "include/str_list.h" | |
13 | ||
14 | #include <boost/asio/yield.hpp> | |
15 | ||
16 | #define dout_subsys ceph_subsys_rgw | |
17 | ||
18 | ||
19 | /* | |
20 | * whitelist utility. Config string is a list of entries, where an entry is either an item, | |
21 | * a prefix, or a suffix. An item would be the name of the entity that we'd look up, | |
22 | * a prefix would be a string ending with an asterisk, a suffix would be a string starting | |
23 | * with an asterisk. For example: | |
24 | * | |
25 | * bucket1, bucket2, foo*, *bar | |
26 | */ | |
27 | class ItemList { | |
28 | bool approve_all{false}; | |
29 | ||
30 | set<string> entries; | |
31 | set<string> prefixes; | |
32 | set<string> suffixes; | |
33 | ||
34 | void parse(const string& str) { | |
35 | list<string> l; | |
36 | ||
37 | get_str_list(str, ",", l); | |
38 | ||
39 | for (auto& entry : l) { | |
40 | entry = rgw_trim_whitespace(entry); | |
41 | if (entry.empty()) { | |
42 | continue; | |
43 | } | |
44 | ||
45 | if (entry == "*") { | |
46 | approve_all = true; | |
47 | return; | |
48 | } | |
49 | ||
50 | if (entry[0] == '*') { | |
51 | suffixes.insert(entry.substr(1)); | |
52 | continue; | |
53 | } | |
54 | ||
55 | if (entry.back() == '*') { | |
56 | prefixes.insert(entry.substr(0, entry.size() - 1)); | |
57 | continue; | |
58 | } | |
59 | ||
60 | entries.insert(entry); | |
61 | } | |
62 | } | |
63 | ||
64 | public: | |
65 | ItemList() {} | |
66 | void init(const string& str, bool def_val) { | |
67 | if (str.empty()) { | |
68 | approve_all = def_val; | |
69 | } else { | |
70 | parse(str); | |
71 | } | |
72 | } | |
73 | ||
74 | bool exists(const string& entry) { | |
75 | if (approve_all) { | |
76 | return true; | |
77 | } | |
78 | ||
79 | if (entries.find(entry) != entries.end()) { | |
80 | return true; | |
81 | } | |
82 | ||
83 | auto i = prefixes.upper_bound(entry); | |
84 | if (i != prefixes.begin()) { | |
85 | --i; | |
86 | if (boost::algorithm::starts_with(entry, *i)) { | |
87 | return true; | |
88 | } | |
89 | } | |
90 | ||
91 | for (i = suffixes.begin(); i != suffixes.end(); ++i) { | |
92 | if (boost::algorithm::ends_with(entry, *i)) { | |
93 | return true; | |
94 | } | |
95 | } | |
96 | ||
97 | return false; | |
98 | } | |
99 | }; | |
100 | ||
101 | #define ES_NUM_SHARDS_MIN 5 | |
102 | ||
103 | #define ES_NUM_SHARDS_DEFAULT 16 | |
104 | #define ES_NUM_REPLICAS_DEFAULT 1 | |
105 | ||
106 | struct ElasticConfig { | |
107 | uint64_t sync_instance{0}; | |
108 | string id; | |
109 | string index_path; | |
110 | std::unique_ptr<RGWRESTConn> conn; | |
111 | bool explicit_custom_meta{true}; | |
112 | string override_index_path; | |
113 | ItemList index_buckets; | |
114 | ItemList allow_owners; | |
115 | uint32_t num_shards{0}; | |
116 | uint32_t num_replicas{0}; | |
117 | ||
118 | void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) { | |
119 | string elastic_endpoint = rgw_conf_get(config, "endpoint", ""); | |
120 | id = string("elastic:") + elastic_endpoint; | |
121 | conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint })); | |
122 | explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true); | |
123 | index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */ | |
124 | allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */ | |
125 | override_index_path = rgw_conf_get(config, "override_index_path", ""); | |
126 | num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT); | |
127 | if (num_shards < ES_NUM_SHARDS_MIN) { | |
128 | num_shards = ES_NUM_SHARDS_MIN; | |
129 | } | |
130 | num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT); | |
131 | } | |
132 | ||
133 | void init_instance(RGWRealm& realm, uint64_t instance_id) { | |
134 | sync_instance = instance_id; | |
135 | ||
136 | if (!override_index_path.empty()) { | |
137 | index_path = override_index_path; | |
138 | return; | |
139 | } | |
140 | ||
141 | char buf[32]; | |
142 | snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF)); | |
143 | ||
144 | index_path = "/rgw-" + realm.get_name() + buf; | |
145 | } | |
146 | ||
147 | string get_index_path() { | |
148 | return index_path; | |
149 | } | |
150 | ||
151 | string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { | |
152 | return index_path + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance); | |
153 | } | |
154 | ||
155 | bool should_handle_operation(RGWBucketInfo& bucket_info) { | |
156 | return index_buckets.exists(bucket_info.bucket.name) && | |
157 | allow_owners.exists(bucket_info.owner.to_str()); | |
158 | } | |
159 | }; | |
160 | ||
161 | using ElasticConfigRef = std::shared_ptr<ElasticConfig>; | |
162 | ||
163 | struct es_dump_type { | |
164 | const char *type; | |
165 | const char *format; | |
166 | bool analyzed; | |
167 | ||
168 | es_dump_type(const char *t, const char *f = nullptr, bool a = false) : type(t), format(f), analyzed(a) {} | |
169 | ||
170 | void dump(Formatter *f) const { | |
171 | encode_json("type", type, f); | |
172 | if (format) { | |
173 | encode_json("format", format, f); | |
174 | } | |
175 | if (!analyzed && strcmp(type, "string") == 0) { | |
176 | encode_json("index", "not_analyzed", f); | |
177 | } | |
178 | } | |
179 | }; | |
180 | ||
181 | struct es_index_mappings { | |
182 | void dump_custom(Formatter *f, const char *section, const char *type, const char *format) const { | |
183 | f->open_object_section(section); | |
184 | ::encode_json("type", "nested", f); | |
185 | f->open_object_section("properties"); | |
186 | encode_json("name", es_dump_type("string"), f); | |
187 | encode_json("value", es_dump_type(type, format), f); | |
188 | f->close_section(); // entry | |
189 | f->close_section(); // custom-string | |
190 | } | |
191 | void dump(Formatter *f) const { | |
192 | f->open_object_section("object"); | |
193 | f->open_object_section("properties"); | |
194 | encode_json("bucket", es_dump_type("string"), f); | |
195 | encode_json("name", es_dump_type("string"), f); | |
196 | encode_json("instance", es_dump_type("string"), f); | |
197 | encode_json("versioned_epoch", es_dump_type("long"), f); | |
198 | f->open_object_section("meta"); | |
199 | f->open_object_section("properties"); | |
200 | encode_json("cache_control", es_dump_type("string"), f); | |
201 | encode_json("content_disposition", es_dump_type("string"), f); | |
202 | encode_json("content_encoding", es_dump_type("string"), f); | |
203 | encode_json("content_language", es_dump_type("string"), f); | |
204 | encode_json("content_type", es_dump_type("string"), f); | |
205 | encode_json("etag", es_dump_type("string"), f); | |
206 | encode_json("expires", es_dump_type("string"), f); | |
207 | f->open_object_section("mtime"); | |
208 | ::encode_json("type", "date", f); | |
209 | ::encode_json("format", "strict_date_optional_time||epoch_millis", f); | |
210 | f->close_section(); // mtime | |
211 | encode_json("size", es_dump_type("long"), f); | |
212 | dump_custom(f, "custom-string", "string", nullptr); | |
213 | dump_custom(f, "custom-int", "long", nullptr); | |
214 | dump_custom(f, "custom-date", "date", "strict_date_optional_time||epoch_millis"); | |
215 | f->close_section(); // properties | |
216 | f->close_section(); // meta | |
217 | f->close_section(); // properties | |
218 | f->close_section(); // object | |
219 | } | |
220 | }; | |
221 | ||
222 | struct es_index_settings { | |
223 | uint32_t num_replicas; | |
224 | uint32_t num_shards; | |
225 | ||
226 | es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {} | |
227 | ||
228 | void dump(Formatter *f) const { | |
229 | encode_json("number_of_replicas", num_replicas, f); | |
230 | encode_json("number_of_shards", num_shards, f); | |
231 | } | |
232 | }; | |
233 | ||
234 | struct es_index_config { | |
235 | es_index_settings settings; | |
236 | es_index_mappings mappings; | |
237 | ||
238 | es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {} | |
239 | ||
240 | void dump(Formatter *f) const { | |
241 | encode_json("settings", settings, f); | |
242 | encode_json("mappings", mappings, f); | |
243 | } | |
244 | }; | |
245 | ||
246 | struct es_obj_metadata { | |
247 | CephContext *cct; | |
248 | ElasticConfigRef es_conf; | |
249 | RGWBucketInfo bucket_info; | |
250 | rgw_obj_key key; | |
251 | ceph::real_time mtime; | |
252 | uint64_t size; | |
253 | map<string, bufferlist> attrs; | |
254 | uint64_t versioned_epoch; | |
255 | ||
256 | es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, | |
257 | const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, | |
258 | map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), | |
259 | mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {} | |
260 | ||
261 | void dump(Formatter *f) const { | |
262 | map<string, string> out_attrs; | |
263 | map<string, string> custom_meta; | |
264 | RGWAccessControlPolicy policy; | |
265 | set<string> permissions; | |
266 | RGWObjTags obj_tags; | |
267 | ||
268 | for (auto i : attrs) { | |
269 | const string& attr_name = i.first; | |
270 | string name; | |
271 | bufferlist& val = i.second; | |
272 | ||
273 | if (attr_name.compare(0, sizeof(RGW_ATTR_PREFIX) - 1, RGW_ATTR_PREFIX) != 0) { | |
274 | continue; | |
275 | } | |
276 | ||
277 | if (attr_name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) { | |
278 | name = attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1); | |
279 | custom_meta[name] = string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0)); | |
280 | continue; | |
281 | } | |
282 | ||
283 | name = attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1); | |
284 | ||
285 | if (name == "acl") { | |
286 | try { | |
287 | auto i = val.begin(); | |
288 | ::decode(policy, i); | |
289 | } catch (buffer::error& err) { | |
290 | ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl; | |
291 | } | |
292 | ||
293 | const RGWAccessControlList& acl = policy.get_acl(); | |
294 | ||
295 | permissions.insert(policy.get_owner().get_id().to_str()); | |
296 | for (auto acliter : acl.get_grant_map()) { | |
297 | const ACLGrant& grant = acliter.second; | |
298 | if (grant.get_type().get_type() == ACL_TYPE_CANON_USER && | |
299 | ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) { | |
300 | rgw_user user; | |
301 | if (grant.get_id(user)) { | |
302 | permissions.insert(user.to_str()); | |
303 | } | |
304 | } | |
305 | } | |
306 | } else if (name == "x-amz-tagging") { | |
307 | auto tags_bl = val.begin(); | |
308 | ::decode(obj_tags, tags_bl); | |
309 | } else if (name == "compression") { | |
310 | RGWCompressionInfo cs_info; | |
311 | auto vals_bl = val.begin(); | |
312 | decode(cs_info, vals_bl); | |
313 | out_attrs[name] = cs_info.compression_type; | |
314 | } else { | |
315 | if (name != "pg_ver" && | |
316 | name != "source_zone" && | |
317 | name != "idtag") { | |
318 | out_attrs[name] = string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0)); | |
319 | } | |
320 | } | |
321 | } | |
322 | ::encode_json("bucket", bucket_info.bucket.name, f); | |
323 | ::encode_json("name", key.name, f); | |
324 | ::encode_json("instance", key.instance, f); | |
325 | ::encode_json("versioned_epoch", versioned_epoch, f); | |
326 | ::encode_json("owner", policy.get_owner(), f); | |
327 | ::encode_json("permissions", permissions, f); | |
328 | f->open_object_section("meta"); | |
329 | ::encode_json("size", size, f); | |
330 | ||
331 | string mtime_str; | |
332 | rgw_to_iso8601(mtime, &mtime_str); | |
333 | ::encode_json("mtime", mtime_str, f); | |
334 | for (auto i : out_attrs) { | |
335 | ::encode_json(i.first.c_str(), i.second, f); | |
336 | } | |
337 | map<string, string> custom_str; | |
338 | map<string, string> custom_int; | |
339 | map<string, string> custom_date; | |
340 | ||
341 | for (auto i : custom_meta) { | |
342 | auto config = bucket_info.mdsearch_config.find(i.first); | |
343 | if (config == bucket_info.mdsearch_config.end()) { | |
344 | if (!es_conf->explicit_custom_meta) { | |
345 | /* default custom meta is of type string */ | |
346 | custom_str[i.first] = i.second; | |
347 | } else { | |
348 | ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; | |
349 | } | |
350 | continue; | |
351 | } | |
352 | switch (config->second) { | |
353 | case ESEntityTypeMap::ES_ENTITY_DATE: | |
354 | custom_date[i.first] = i.second; | |
355 | break; | |
356 | case ESEntityTypeMap::ES_ENTITY_INT: | |
357 | custom_int[i.first] = i.second; | |
358 | break; | |
359 | default: | |
360 | custom_str[i.first] = i.second; | |
361 | } | |
362 | } | |
363 | ||
364 | if (!custom_str.empty()) { | |
365 | f->open_array_section("custom-string"); | |
366 | for (auto i : custom_str) { | |
367 | f->open_object_section("entity"); | |
368 | ::encode_json("name", i.first.c_str(), f); | |
369 | ::encode_json("value", i.second, f); | |
370 | f->close_section(); | |
371 | } | |
372 | f->close_section(); | |
373 | } | |
374 | if (!custom_int.empty()) { | |
375 | f->open_array_section("custom-int"); | |
376 | for (auto i : custom_int) { | |
377 | f->open_object_section("entity"); | |
378 | ::encode_json("name", i.first.c_str(), f); | |
379 | ::encode_json("value", i.second, f); | |
380 | f->close_section(); | |
381 | } | |
382 | f->close_section(); | |
383 | } | |
384 | if (!custom_date.empty()) { | |
385 | f->open_array_section("custom-date"); | |
386 | for (auto i : custom_date) { | |
387 | /* | |
388 | * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc, | |
389 | * which will end up with failed sync | |
390 | */ | |
391 | real_time t; | |
392 | int r = parse_time(i.second.c_str(), &t); | |
393 | if (r < 0) { | |
394 | ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; | |
395 | continue; | |
396 | } | |
397 | ||
398 | string time_str; | |
399 | rgw_to_iso8601(t, &time_str); | |
400 | ||
401 | f->open_object_section("entity"); | |
402 | ::encode_json("name", i.first.c_str(), f); | |
403 | ::encode_json("value", time_str.c_str(), f); | |
404 | f->close_section(); | |
405 | } | |
406 | f->close_section(); | |
407 | } | |
408 | f->close_section(); // meta | |
409 | const auto& m = obj_tags.get_tags(); | |
410 | if (m.size() > 0){ | |
411 | f->open_array_section("tagging"); | |
412 | for (const auto &it : m) { | |
413 | f->open_object_section("tag"); | |
414 | ::encode_json("key", it.first, f); | |
415 | ::encode_json("value",it.second, f); | |
416 | f->close_section(); | |
417 | } | |
418 | f->close_section(); // tagging | |
419 | } | |
420 | } | |
421 | }; | |
422 | ||
423 | class RGWElasticInitConfigCBCR : public RGWCoroutine { | |
424 | RGWDataSyncEnv *sync_env; | |
425 | ElasticConfigRef conf; | |
426 | public: | |
427 | RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, | |
428 | ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), | |
429 | sync_env(_sync_env), | |
430 | conf(_conf) {} | |
431 | int operate() override { | |
432 | reenter(this) { | |
433 | ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; | |
434 | yield { | |
435 | string path = conf->get_index_path(); | |
436 | ||
437 | es_index_settings settings(conf->num_replicas, conf->num_shards); | |
438 | es_index_mappings mappings; | |
439 | ||
440 | es_index_config index_conf(settings, mappings); | |
441 | ||
442 | call(new RGWPutRESTResourceCR<es_index_config, int>(sync_env->cct, conf->conn.get(), | |
443 | sync_env->http_manager, | |
444 | path, nullptr /* params */, | |
445 | index_conf, nullptr /* result */)); | |
446 | } | |
447 | if (retcode < 0) { | |
448 | return set_cr_error(retcode); | |
449 | } | |
450 | return set_cr_done(); | |
451 | } | |
452 | return 0; | |
453 | } | |
454 | ||
455 | }; | |
456 | ||
457 | class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { | |
458 | ElasticConfigRef conf; | |
459 | uint64_t versioned_epoch; | |
460 | public: | |
461 | RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
462 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
463 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), | |
464 | versioned_epoch(_versioned_epoch) {} | |
465 | int operate() override { | |
466 | reenter(this) { | |
467 | ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone | |
468 | << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime | |
469 | << " attrs=" << attrs << dendl; | |
470 | yield { | |
471 | string path = conf->get_obj_path(bucket_info, key); | |
472 | es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); | |
473 | ||
474 | call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(), | |
475 | sync_env->http_manager, | |
476 | path, nullptr /* params */, | |
477 | doc, nullptr /* result */)); | |
478 | ||
479 | } | |
480 | if (retcode < 0) { | |
481 | return set_cr_error(retcode); | |
482 | } | |
483 | return set_cr_done(); | |
484 | } | |
485 | return 0; | |
486 | } | |
487 | }; | |
488 | ||
489 | class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
490 | ElasticConfigRef conf; | |
491 | uint64_t versioned_epoch; | |
492 | public: | |
493 | RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, | |
494 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
495 | ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), | |
496 | conf(_conf), versioned_epoch(_versioned_epoch) { | |
497 | } | |
498 | ||
499 | ~RGWElasticHandleRemoteObjCR() override {} | |
500 | ||
501 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
502 | return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); | |
503 | } | |
504 | }; | |
505 | ||
506 | class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { | |
507 | RGWDataSyncEnv *sync_env; | |
508 | RGWBucketInfo bucket_info; | |
509 | rgw_obj_key key; | |
510 | ceph::real_time mtime; | |
511 | ElasticConfigRef conf; | |
512 | public: | |
513 | RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
514 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, | |
515 | ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), | |
516 | bucket_info(_bucket_info), key(_key), | |
517 | mtime(_mtime), conf(_conf) {} | |
518 | int operate() override { | |
519 | reenter(this) { | |
520 | ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone | |
521 | << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; | |
522 | yield { | |
523 | string path = conf->get_obj_path(bucket_info, key); | |
524 | ||
525 | call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), | |
526 | sync_env->http_manager, | |
527 | path, nullptr /* params */)); | |
528 | } | |
529 | if (retcode < 0) { | |
530 | return set_cr_error(retcode); | |
531 | } | |
532 | return set_cr_done(); | |
533 | } | |
534 | return 0; | |
535 | } | |
536 | ||
537 | }; | |
538 | ||
539 | class RGWElasticDataSyncModule : public RGWDataSyncModule { | |
540 | ElasticConfigRef conf; | |
541 | public: | |
542 | RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) { | |
543 | conf->init(cct, config); | |
544 | } | |
545 | ~RGWElasticDataSyncModule() override {} | |
546 | ||
547 | void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { | |
548 | conf->init_instance(sync_env->store->get_realm(), instance_id); | |
549 | } | |
550 | ||
551 | RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { | |
552 | ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; | |
553 | return new RGWElasticInitConfigCBCR(sync_env, conf); | |
554 | } | |
555 | RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { | |
556 | ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; | |
557 | if (!conf->should_handle_operation(bucket_info)) { | |
558 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
559 | return nullptr; | |
560 | } | |
561 | return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch); | |
562 | } | |
563 | RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { | |
564 | /* versioned and versioned epoch params are useless in the elasticsearch backend case */ | |
565 | ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; | |
566 | if (!conf->should_handle_operation(bucket_info)) { | |
567 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; | |
568 | return nullptr; | |
569 | } | |
570 | return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); | |
571 | } | |
572 | RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, | |
573 | rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { | |
574 | ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime | |
575 | << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; | |
576 | ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; | |
577 | return NULL; | |
578 | } | |
579 | RGWRESTConn *get_rest_conn() { | |
580 | return conf->conn.get(); | |
581 | } | |
582 | ||
583 | string get_index_path() { | |
584 | return conf->get_index_path(); | |
585 | } | |
586 | }; | |
587 | ||
588 | RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config) | |
589 | { | |
590 | data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config)); | |
591 | } | |
592 | ||
593 | RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() | |
594 | { | |
595 | return data_handler.get(); | |
596 | } | |
597 | ||
598 | RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() | |
599 | { | |
600 | return data_handler->get_rest_conn(); | |
601 | } | |
602 | ||
603 | string RGWElasticSyncModuleInstance::get_index_path() { | |
604 | return data_handler->get_index_path(); | |
605 | } | |
606 | ||
607 | RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { | |
608 | if (dialect != RGW_REST_S3) { | |
609 | return orig; | |
610 | } | |
611 | delete orig; | |
612 | return new RGWRESTMgr_MDSearch_S3(); | |
613 | } | |
614 | ||
615 | int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) { | |
616 | string endpoint; | |
617 | auto i = config.find("endpoint"); | |
618 | if (i != config.end()) { | |
619 | endpoint = i->second; | |
620 | } | |
621 | instance->reset(new RGWElasticSyncModuleInstance(cct, config)); | |
622 | return 0; | |
623 | } | |
624 |