1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_es.h"
10 #include "rgw_sync_module_es_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rest.h"
14 #include "rgw_es_query.h"
16 #include "include/str_list.h"
18 #include <boost/asio/yield.hpp>
20 #define dout_subsys ceph_subsys_rgw
24 * whitelist utility. Config string is a list of entries, where an entry is either an item,
25 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
26 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
27 * with an asterisk. For example:
29 * bucket1, bucket2, foo*, *bar
32 bool approve_all
{false};
38 void parse(const string
& str
) {
41 get_str_list(str
, ",", l
);
43 for (auto& entry
: l
) {
44 entry
= rgw_trim_whitespace(entry
);
54 if (entry
[0] == '*') {
55 suffixes
.insert(entry
.substr(1));
59 if (entry
.back() == '*') {
60 prefixes
.insert(entry
.substr(0, entry
.size() - 1));
64 entries
.insert(entry
);
70 void init(const string
& str
, bool def_val
) {
72 approve_all
= def_val
;
78 bool exists(const string
& entry
) {
83 if (entries
.find(entry
) != entries
.end()) {
87 auto i
= prefixes
.upper_bound(entry
);
88 if (i
!= prefixes
.begin()) {
90 if (boost::algorithm::starts_with(entry
, *i
)) {
95 for (i
= suffixes
.begin(); i
!= suffixes
.end(); ++i
) {
96 if (boost::algorithm::ends_with(entry
, *i
)) {
105 #define ES_NUM_SHARDS_MIN 5
107 #define ES_NUM_SHARDS_DEFAULT 16
108 #define ES_NUM_REPLICAS_DEFAULT 1
110 using ESVersion
= std::pair
<int,int>;
111 static constexpr ESVersion ES_V5
{5,0};
115 std::string cluster_name
;
116 std::string cluster_uuid
;
119 void decode_json(JSONObj
*obj
);
121 std::string
get_version_str(){
122 return std::to_string(version
.first
) + "." + std::to_string(version
.second
);
126 // simple wrapper structure to wrap the es version nested type
127 struct es_version_decoder
{
130 int parse_version(const std::string
& s
) {
132 int ret
= sscanf(s
.c_str(), "%d.%d", &major
, &minor
);
136 version
= std::make_pair(major
,minor
);
140 void decode_json(JSONObj
*obj
) {
142 JSONDecoder::decode_json("number",s
,obj
);
143 if (parse_version(s
) < 0)
144 throw JSONDecoder::err("Failed to parse ElasticVersion");
149 void ESInfo::decode_json(JSONObj
*obj
)
151 JSONDecoder::decode_json("name", name
, obj
);
152 JSONDecoder::decode_json("cluster_name", cluster_name
, obj
);
153 JSONDecoder::decode_json("cluster_uuid", cluster_uuid
, obj
);
154 es_version_decoder esv
;
155 JSONDecoder::decode_json("version", esv
, obj
);
156 version
= std::move(esv
.version
);
159 struct ElasticConfig
{
160 uint64_t sync_instance
{0};
163 std::unique_ptr
<RGWRESTConn
> conn
;
164 bool explicit_custom_meta
{true};
165 string override_index_path
;
166 ItemList index_buckets
;
167 ItemList allow_owners
;
168 uint32_t num_shards
{0};
169 uint32_t num_replicas
{0};
170 std::map
<string
,string
> default_headers
= {{ "Content-Type", "application/json" }};
172 void init(CephContext
*cct
, const map
<string
, string
, ltstr_nocase
>& config
) {
173 string elastic_endpoint
= rgw_conf_get(config
, "endpoint", "");
174 id
= string("elastic:") + elastic_endpoint
;
175 conn
.reset(new RGWRESTConn(cct
, nullptr, id
, { elastic_endpoint
}));
176 explicit_custom_meta
= rgw_conf_get_bool(config
, "explicit_custom_meta", true);
177 index_buckets
.init(rgw_conf_get(config
, "index_buckets_list", ""), true); /* approve all buckets by default */
178 allow_owners
.init(rgw_conf_get(config
, "approved_owners_list", ""), true); /* approve all bucket owners by default */
179 override_index_path
= rgw_conf_get(config
, "override_index_path", "");
180 num_shards
= rgw_conf_get_int(config
, "num_shards", ES_NUM_SHARDS_DEFAULT
);
181 if (num_shards
< ES_NUM_SHARDS_MIN
) {
182 num_shards
= ES_NUM_SHARDS_MIN
;
184 num_replicas
= rgw_conf_get_int(config
, "num_replicas", ES_NUM_REPLICAS_DEFAULT
);
185 string user
= rgw_conf_get(config
, "username", "");
186 string pw
= rgw_conf_get(config
, "password", "");
187 if (!user
.empty() && !pw
.empty()) {
188 auto auth_string
= user
+ ":" + pw
;
189 default_headers
.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string
));
193 void init_instance(RGWRealm
& realm
, uint64_t instance_id
) {
194 sync_instance
= instance_id
;
196 if (!override_index_path
.empty()) {
197 index_path
= override_index_path
;
202 snprintf(buf
, sizeof(buf
), "-%08x", (uint32_t)(sync_instance
& 0xFFFFFFFF));
204 index_path
= "/rgw-" + realm
.get_name() + buf
;
207 string
get_index_path() {
211 map
<string
, string
>& get_request_headers() {
212 return default_headers
;
215 string
get_obj_path(const RGWBucketInfo
& bucket_info
, const rgw_obj_key
& key
) {
216 return index_path
+ "/object/" + url_encode(bucket_info
.bucket
.bucket_id
+ ":" + key
.name
+ ":" + (key
.instance
.empty() ? "null" : key
.instance
));
219 bool should_handle_operation(RGWBucketInfo
& bucket_info
) {
220 return index_buckets
.exists(bucket_info
.bucket
.name
) &&
221 allow_owners
.exists(bucket_info
.owner
.to_str());
225 using ElasticConfigRef
= std::shared_ptr
<ElasticConfig
>;
227 static const char *es_type_to_str(const ESType
& t
) {
229 case ESType::String
: return "string";
230 case ESType::Text
: return "text";
231 case ESType::Keyword
: return "keyword";
232 case ESType::Long
: return "long";
233 case ESType::Integer
: return "integer";
234 case ESType::Short
: return "short";
235 case ESType::Byte
: return "byte";
236 case ESType::Double
: return "double";
237 case ESType::Float
: return "float";
238 case ESType::Half_Float
: return "half_float";
239 case ESType::Scaled_Float
: return "scaled_float";
240 case ESType::Date
: return "date";
241 case ESType::Boolean
: return "boolean";
242 case ESType::Integer_Range
: return "integer_range";
243 case ESType::Float_Range
: return "float_range";
244 case ESType::Double_Range
: return "date_range";
245 case ESType::Date_Range
: return "date_range";
246 case ESType::Geo_Point
: return "geo_point";
247 case ESType::Ip
: return "ip";
255 const char *format
{nullptr};
256 boost::optional
<bool> analyzed
;
258 es_type_v2(ESType et
) : estype(et
) {}
260 void dump(Formatter
*f
) const {
261 const char *type_str
= es_type_to_str(estype
);
262 encode_json("type", type_str
, f
);
264 encode_json("format", format
, f
);
267 auto is_analyzed
= analyzed
;
269 if (estype
== ESType::String
&&
275 encode_json("index", (is_analyzed
.value() ? "analyzed" : "not_analyzed"), f
);
282 const char *format
{nullptr};
283 boost::optional
<bool> analyzed
;
284 boost::optional
<bool> index
;
286 es_type_v5(ESType et
) : estype(et
) {}
288 void dump(Formatter
*f
) const {
290 if (estype
!= ESType::String
) {
293 bool is_analyzed
= analyzed
.value_or(false);
294 new_estype
= (is_analyzed
? ESType::Text
: ESType::Keyword
);
295 /* index = true; ... Not setting index=true, because that's the default,
296 * and dumping a boolean value *might* be a problem when backporting this
297 * because value might get quoted
301 const char *type_str
= es_type_to_str(new_estype
);
302 encode_json("type", type_str
, f
);
304 encode_json("format", format
, f
);
307 encode_json("index", index
.value(), f
);
313 struct es_type
: public T
{
314 es_type(T t
) : T(t
) {}
315 es_type
& set_format(const char *f
) {
320 es_type
& set_analyzed(bool a
) {
327 struct es_index_mappings
{
328 ESType string_type
{ESType::String
};
330 es_type
<T
> est(ESType t
) const {
331 return es_type
<T
>(t
);
334 void dump_custom(const char *section
, ESType type
, const char *format
, Formatter
*f
) const {
335 f
->open_object_section(section
);
336 ::encode_json("type", "nested", f
);
337 f
->open_object_section("properties");
338 encode_json("name", est(string_type
), f
);
339 encode_json("value", est(type
).set_format(format
), f
);
340 f
->close_section(); // entry
341 f
->close_section(); // custom-string
344 void dump(Formatter
*f
) const {
345 f
->open_object_section("object");
346 f
->open_object_section("properties");
347 encode_json("bucket", est(string_type
), f
);
348 encode_json("name", est(string_type
), f
);
349 encode_json("instance", est(string_type
), f
);
350 encode_json("versioned_epoch", est(ESType::Long
), f
);
351 f
->open_object_section("meta");
352 f
->open_object_section("properties");
353 encode_json("cache_control", est(string_type
), f
);
354 encode_json("content_disposition", est(string_type
), f
);
355 encode_json("content_encoding", est(string_type
), f
);
356 encode_json("content_language", est(string_type
), f
);
357 encode_json("content_type", est(string_type
), f
);
358 encode_json("storage_class", est(string_type
), f
);
359 encode_json("etag", est(string_type
), f
);
360 encode_json("expires", est(string_type
), f
);
361 encode_json("mtime", est(ESType::Date
)
362 .set_format("strict_date_optional_time||epoch_millis"), f
);
363 encode_json("size", est(ESType::Long
), f
);
364 dump_custom("custom-string", string_type
, nullptr, f
);
365 dump_custom("custom-int", ESType::Long
, nullptr, f
);
366 dump_custom("custom-date", ESType::Date
, "strict_date_optional_time||epoch_millis", f
);
367 f
->close_section(); // properties
368 f
->close_section(); // meta
369 f
->close_section(); // properties
370 f
->close_section(); // object
374 struct es_index_settings
{
375 uint32_t num_replicas
;
378 es_index_settings(uint32_t _replicas
, uint32_t _shards
) : num_replicas(_replicas
), num_shards(_shards
) {}
380 void dump(Formatter
*f
) const {
381 encode_json("number_of_replicas", num_replicas
, f
);
382 encode_json("number_of_shards", num_shards
, f
);
386 struct es_index_config_base
{
387 virtual ~es_index_config_base() {}
388 virtual void dump(Formatter
*f
) const = 0;
392 struct es_index_config
: public es_index_config_base
{
393 es_index_settings settings
;
394 es_index_mappings
<T
> mappings
;
396 es_index_config(es_index_settings
& _s
) : settings(_s
) {}
398 void dump(Formatter
*f
) const {
399 encode_json("settings", settings
, f
);
400 encode_json("mappings", mappings
, f
);
404 static bool is_sys_attr(const std::string
& attr_name
){
405 static constexpr std::initializer_list
<const char*> rgw_sys_attrs
= {RGW_ATTR_PG_VER
,
406 RGW_ATTR_SOURCE_ZONE
,
408 RGW_ATTR_TEMPURL_KEY1
,
409 RGW_ATTR_TEMPURL_KEY2
,
414 return std::find(rgw_sys_attrs
.begin(), rgw_sys_attrs
.end(), attr_name
) != rgw_sys_attrs
.end();
417 static size_t attr_len(const bufferlist
& val
)
419 size_t len
= val
.length();
420 if (len
&& val
[len
- 1] == '\0') {
427 struct es_obj_metadata
{
429 ElasticConfigRef es_conf
;
430 RGWBucketInfo bucket_info
;
432 ceph::real_time mtime
;
434 map
<string
, bufferlist
> attrs
;
435 uint64_t versioned_epoch
;
437 es_obj_metadata(CephContext
*_cct
, ElasticConfigRef _es_conf
, const RGWBucketInfo
& _bucket_info
,
438 const rgw_obj_key
& _key
, ceph::real_time
& _mtime
, uint64_t _size
,
439 map
<string
, bufferlist
>& _attrs
, uint64_t _versioned_epoch
) : cct(_cct
), es_conf(_es_conf
), bucket_info(_bucket_info
), key(_key
),
440 mtime(_mtime
), size(_size
), attrs(std::move(_attrs
)), versioned_epoch(_versioned_epoch
) {}
442 void dump(Formatter
*f
) const {
443 map
<string
, string
> out_attrs
;
444 map
<string
, string
> custom_meta
;
445 RGWAccessControlPolicy policy
;
446 set
<string
> permissions
;
449 for (auto i
: attrs
) {
450 const string
& attr_name
= i
.first
;
451 bufferlist
& val
= i
.second
;
453 if (!boost::algorithm::starts_with(attr_name
, RGW_ATTR_PREFIX
)) {
457 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_META_PREFIX
)) {
458 custom_meta
.emplace(attr_name
.substr(sizeof(RGW_ATTR_META_PREFIX
) - 1),
459 string(val
.c_str(), attr_len(val
)));
463 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_CRYPT_PREFIX
)) {
467 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_OLH_PREFIX
)) {
468 // skip versioned object olh info
472 if (attr_name
== RGW_ATTR_ACL
) {
474 auto i
= val
.begin();
476 } catch (buffer::error
& err
) {
477 ldout(cct
, 0) << "ERROR: failed to decode acl for " << bucket_info
.bucket
<< "/" << key
<< dendl
;
481 const RGWAccessControlList
& acl
= policy
.get_acl();
483 permissions
.insert(policy
.get_owner().get_id().to_str());
484 for (auto acliter
: acl
.get_grant_map()) {
485 const ACLGrant
& grant
= acliter
.second
;
486 if (grant
.get_type().get_type() == ACL_TYPE_CANON_USER
&&
487 ((uint32_t)grant
.get_permission().get_permissions() & RGW_PERM_READ
) != 0) {
489 if (grant
.get_id(user
)) {
490 permissions
.insert(user
.to_str());
494 } else if (attr_name
== RGW_ATTR_TAGS
) {
496 auto tags_bl
= val
.begin();
497 ::decode(obj_tags
, tags_bl
);
498 } catch (buffer::error
& err
) {
499 ldout(cct
,0) << "ERROR: failed to decode obj tags for "
500 << bucket_info
.bucket
<< "/" << key
<< dendl
;
503 } else if (attr_name
== RGW_ATTR_COMPRESSION
) {
504 RGWCompressionInfo cs_info
;
506 auto vals_bl
= val
.begin();
507 ::decode(cs_info
, vals_bl
);
508 } catch (buffer::error
& err
) {
509 ldout(cct
,0) << "ERROR: failed to decode compression attr for "
510 << bucket_info
.bucket
<< "/" << key
<< dendl
;
513 out_attrs
.emplace("compression",std::move(cs_info
.compression_type
));
515 if (!is_sys_attr(attr_name
)) {
516 out_attrs
.emplace(attr_name
.substr(sizeof(RGW_ATTR_PREFIX
) - 1),
517 std::string(val
.c_str(), attr_len(val
)));
521 ::encode_json("bucket", bucket_info
.bucket
.name
, f
);
522 ::encode_json("name", key
.name
, f
);
523 string instance
= key
.instance
;
524 if (instance
.empty())
526 ::encode_json("instance", instance
, f
);
527 ::encode_json("versioned_epoch", versioned_epoch
, f
);
528 ::encode_json("owner", policy
.get_owner(), f
);
529 ::encode_json("permissions", permissions
, f
);
530 f
->open_object_section("meta");
531 ::encode_json("size", size
, f
);
534 rgw_to_iso8601(mtime
, &mtime_str
);
535 ::encode_json("mtime", mtime_str
, f
);
536 for (auto i
: out_attrs
) {
537 ::encode_json(i
.first
.c_str(), i
.second
, f
);
539 map
<string
, string
> custom_str
;
540 map
<string
, string
> custom_int
;
541 map
<string
, string
> custom_date
;
543 for (auto i
: custom_meta
) {
544 auto config
= bucket_info
.mdsearch_config
.find(i
.first
);
545 if (config
== bucket_info
.mdsearch_config
.end()) {
546 if (!es_conf
->explicit_custom_meta
) {
547 /* default custom meta is of type string */
548 custom_str
[i
.first
] = i
.second
;
550 ldout(cct
, 20) << "custom meta entry key=" << i
.first
<< " not found in bucket mdsearch config: " << bucket_info
.mdsearch_config
<< dendl
;
554 switch (config
->second
) {
555 case ESEntityTypeMap::ES_ENTITY_DATE
:
556 custom_date
[i
.first
] = i
.second
;
558 case ESEntityTypeMap::ES_ENTITY_INT
:
559 custom_int
[i
.first
] = i
.second
;
562 custom_str
[i
.first
] = i
.second
;
566 if (!custom_str
.empty()) {
567 f
->open_array_section("custom-string");
568 for (auto i
: custom_str
) {
569 f
->open_object_section("entity");
570 ::encode_json("name", i
.first
.c_str(), f
);
571 ::encode_json("value", i
.second
, f
);
576 if (!custom_int
.empty()) {
577 f
->open_array_section("custom-int");
578 for (auto i
: custom_int
) {
579 f
->open_object_section("entity");
580 ::encode_json("name", i
.first
.c_str(), f
);
581 ::encode_json("value", i
.second
, f
);
586 if (!custom_date
.empty()) {
587 f
->open_array_section("custom-date");
588 for (auto i
: custom_date
) {
590 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
591 * which will end up with failed sync
594 int r
= parse_time(i
.second
.c_str(), &t
);
596 ldout(cct
, 20) << __func__
<< "(): failed to parse time (" << i
.second
<< "), skipping encoding of custom date attribute" << dendl
;
601 rgw_to_iso8601(t
, &time_str
);
603 f
->open_object_section("entity");
604 ::encode_json("name", i
.first
.c_str(), f
);
605 ::encode_json("value", time_str
.c_str(), f
);
610 f
->close_section(); // meta
611 const auto& m
= obj_tags
.get_tags();
613 f
->open_array_section("tagging");
614 for (const auto &it
: m
) {
615 f
->open_object_section("tag");
616 ::encode_json("key", it
.first
, f
);
617 ::encode_json("value",it
.second
, f
);
620 f
->close_section(); // tagging
625 class RGWElasticInitConfigCBCR
: public RGWCoroutine
{
626 RGWDataSyncEnv
*sync_env
;
627 ElasticConfigRef conf
;
630 struct _err_response
{
632 vector
<err_reason
> root_cause
;
637 void decode_json(JSONObj
*obj
) {
638 JSONDecoder::decode_json("root_cause", root_cause
, obj
);
639 JSONDecoder::decode_json("type", type
, obj
);
640 JSONDecoder::decode_json("reason", reason
, obj
);
641 JSONDecoder::decode_json("index", index
, obj
);
645 void decode_json(JSONObj
*obj
) {
646 JSONDecoder::decode_json("error", error
, obj
);
651 RGWElasticInitConfigCBCR(RGWDataSyncEnv
*_sync_env
,
652 ElasticConfigRef _conf
) : RGWCoroutine(_sync_env
->cct
),
655 int operate() override
{
657 ldout(sync_env
->cct
, 0) << ": init elasticsearch config zone=" << sync_env
->source_zone
<< dendl
;
658 yield
call(new RGWReadRESTResourceCR
<ESInfo
> (sync_env
->cct
,
660 sync_env
->http_manager
,
661 "/", nullptr /*params*/,
662 &(conf
->default_headers
),
665 return set_cr_error(retcode
);
669 string path
= conf
->get_index_path();
670 ldout(sync_env
->cct
, 5) << "got elastic version=" << es_info
.get_version_str() << dendl
;
672 es_index_settings
settings(conf
->num_replicas
, conf
->num_shards
);
674 std::unique_ptr
<es_index_config_base
> index_conf
;
676 if (es_info
.version
>= ES_V5
) {
677 ldout(sync_env
->cct
, 0) << "elasticsearch: index mapping: version >= 5" << dendl
;
678 index_conf
.reset(new es_index_config
<es_type_v5
>(settings
));
680 ldout(sync_env
->cct
, 0) << "elasticsearch: index mapping: version < 5" << dendl
;
681 index_conf
.reset(new es_index_config
<es_type_v2
>(settings
));
683 call(new RGWPutRESTResourceCR
<es_index_config_base
, int, _err_response
> (sync_env
->cct
,
685 sync_env
->http_manager
,
686 path
, nullptr /*params*/,
687 &(conf
->default_headers
),
688 *index_conf
, nullptr, &err_response
));
691 ldout(sync_env
->cct
, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response
.error
.type
<< " response.reason=" << err_response
.error
.reason
<< dendl
;
693 if (err_response
.error
.type
!= "index_already_exists_exception") {
694 return set_cr_error(retcode
);
697 ldout(sync_env
->cct
, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl
;
699 return set_cr_done();
706 class RGWElasticHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
707 ElasticConfigRef conf
;
708 uint64_t versioned_epoch
;
710 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv
*_sync_env
,
711 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
,
712 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sync_env
, _bucket_info
, _key
), conf(_conf
),
713 versioned_epoch(_versioned_epoch
) {}
714 int operate() override
{
716 ldout(sync_env
->cct
, 10) << ": stat of remote obj: z=" << sync_env
->source_zone
717 << " b=" << bucket_info
.bucket
<< " k=" << key
718 << " size=" << size
<< " mtime=" << mtime
<< dendl
;
721 string path
= conf
->get_obj_path(bucket_info
, key
);
722 es_obj_metadata
doc(sync_env
->cct
, conf
, bucket_info
, key
, mtime
, size
, attrs
, versioned_epoch
);
724 call(new RGWPutRESTResourceCR
<es_obj_metadata
, int>(sync_env
->cct
, conf
->conn
.get(),
725 sync_env
->http_manager
,
726 path
, nullptr /* params */,
727 &(conf
->default_headers
),
728 doc
, nullptr /* result */));
732 return set_cr_error(retcode
);
734 return set_cr_done();
740 class RGWElasticHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
741 ElasticConfigRef conf
;
742 uint64_t versioned_epoch
;
744 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv
*_sync_env
,
745 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
,
746 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sync_env
, _bucket_info
, _key
),
747 conf(_conf
), versioned_epoch(_versioned_epoch
) {
750 ~RGWElasticHandleRemoteObjCR() override
{}
752 RGWStatRemoteObjCBCR
*allocate_callback() override
{
753 return new RGWElasticHandleRemoteObjCBCR(sync_env
, bucket_info
, key
, conf
, versioned_epoch
);
757 class RGWElasticRemoveRemoteObjCBCR
: public RGWCoroutine
{
758 RGWDataSyncEnv
*sync_env
;
759 RGWBucketInfo bucket_info
;
761 ceph::real_time mtime
;
762 ElasticConfigRef conf
;
764 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv
*_sync_env
,
765 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
766 ElasticConfigRef _conf
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
767 bucket_info(_bucket_info
), key(_key
),
768 mtime(_mtime
), conf(_conf
) {}
769 int operate() override
{
771 ldout(sync_env
->cct
, 10) << ": remove remote obj: z=" << sync_env
->source_zone
772 << " b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
774 string path
= conf
->get_obj_path(bucket_info
, key
);
776 call(new RGWDeleteRESTResourceCR(sync_env
->cct
, conf
->conn
.get(),
777 sync_env
->http_manager
,
778 path
, nullptr /* params */));
781 return set_cr_error(retcode
);
783 return set_cr_done();
790 class RGWElasticDataSyncModule
: public RGWDataSyncModule
{
791 ElasticConfigRef conf
;
793 RGWElasticDataSyncModule(CephContext
*cct
, const map
<string
, string
, ltstr_nocase
>& config
) : conf(std::make_shared
<ElasticConfig
>()) {
794 conf
->init(cct
, config
);
796 ~RGWElasticDataSyncModule() override
{}
798 void init(RGWDataSyncEnv
*sync_env
, uint64_t instance_id
) override
{
799 conf
->init_instance(sync_env
->store
->get_realm(), instance_id
);
802 RGWCoroutine
*init_sync(RGWDataSyncEnv
*sync_env
) override
{
803 ldout(sync_env
->cct
, 5) << conf
->id
<< ": init" << dendl
;
804 return new RGWElasticInitConfigCBCR(sync_env
, conf
);
806 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, boost::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
807 ldout(sync_env
->cct
, 10) << conf
->id
<< ": sync_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
808 if (!conf
->should_handle_operation(bucket_info
)) {
809 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
812 return new RGWElasticHandleRemoteObjCR(sync_env
, bucket_info
, key
, conf
, versioned_epoch
.value_or(0));
814 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
815 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
816 ldout(sync_env
->cct
, 10) << conf
->id
<< ": rm_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
817 if (!conf
->should_handle_operation(bucket_info
)) {
818 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
821 return new RGWElasticRemoveRemoteObjCBCR(sync_env
, bucket_info
, key
, mtime
, conf
);
823 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
824 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
825 ldout(sync_env
->cct
, 10) << conf
->id
<< ": create_delete_marker: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
826 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
827 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (not handled)" << dendl
;
830 RGWRESTConn
*get_rest_conn() {
831 return conf
->conn
.get();
834 string
get_index_path() {
835 return conf
->get_index_path();
838 map
<string
, string
>& get_request_headers() {
839 return conf
->get_request_headers();
843 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext
*cct
, const map
<string
, string
, ltstr_nocase
>& config
)
845 data_handler
= std::unique_ptr
<RGWElasticDataSyncModule
>(new RGWElasticDataSyncModule(cct
, config
));
848 RGWDataSyncModule
*RGWElasticSyncModuleInstance::get_data_handler()
850 return data_handler
.get();
853 RGWRESTConn
*RGWElasticSyncModuleInstance::get_rest_conn()
855 return data_handler
->get_rest_conn();
858 string
RGWElasticSyncModuleInstance::get_index_path() {
859 return data_handler
->get_index_path();
862 map
<string
, string
>& RGWElasticSyncModuleInstance::get_request_headers() {
863 return data_handler
->get_request_headers();
866 RGWRESTMgr
*RGWElasticSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
867 if (dialect
!= RGW_REST_S3
) {
871 return new RGWRESTMgr_MDSearch_S3();
874 int RGWElasticSyncModule::create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) {
876 auto i
= config
.find("endpoint");
877 if (i
!= config
.end()) {
878 endpoint
= i
->second
;
880 instance
->reset(new RGWElasticSyncModuleInstance(cct
, config
));