1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_es.h"
10 #include "rgw_sync_module_es_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rest.h"
14 #include "rgw_es_query.h"
17 #include "services/svc_zone.h"
19 #include "include/str_list.h"
21 #include <boost/asio/yield.hpp>
23 #define dout_subsys ceph_subsys_rgw
27 * whitelist utility. Config string is a list of entries, where an entry is either an item,
28 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
29 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
30 * with an asterisk. For example:
32 * bucket1, bucket2, foo*, *bar
35 bool approve_all
{false};
41 void parse(const string
& str
) {
44 get_str_list(str
, ",", l
);
46 for (auto& entry
: l
) {
47 entry
= rgw_trim_whitespace(entry
);
57 if (entry
[0] == '*') {
58 suffixes
.insert(entry
.substr(1));
62 if (entry
.back() == '*') {
63 prefixes
.insert(entry
.substr(0, entry
.size() - 1));
67 entries
.insert(entry
);
73 void init(const string
& str
, bool def_val
) {
75 approve_all
= def_val
;
81 bool exists(const string
& entry
) {
86 if (entries
.find(entry
) != entries
.end()) {
90 auto i
= prefixes
.upper_bound(entry
);
91 if (i
!= prefixes
.begin()) {
93 if (boost::algorithm::starts_with(entry
, *i
)) {
98 for (i
= suffixes
.begin(); i
!= suffixes
.end(); ++i
) {
99 if (boost::algorithm::ends_with(entry
, *i
)) {
108 #define ES_NUM_SHARDS_MIN 5
110 #define ES_NUM_SHARDS_DEFAULT 16
111 #define ES_NUM_REPLICAS_DEFAULT 1
113 using ESVersion
= std::pair
<int,int>;
114 static constexpr ESVersion ES_V5
{5,0};
118 std::string cluster_name
;
119 std::string cluster_uuid
;
122 void decode_json(JSONObj
*obj
);
124 std::string
get_version_str(){
125 return std::to_string(version
.first
) + "." + std::to_string(version
.second
);
129 // simple wrapper structure to wrap the es version nested type
130 struct es_version_decoder
{
133 int parse_version(const std::string
& s
) {
135 int ret
= sscanf(s
.c_str(), "%d.%d", &major
, &minor
);
139 version
= std::make_pair(major
,minor
);
143 void decode_json(JSONObj
*obj
) {
145 JSONDecoder::decode_json("number",s
,obj
);
146 if (parse_version(s
) < 0)
147 throw JSONDecoder::err("Failed to parse ElasticVersion");
152 void ESInfo::decode_json(JSONObj
*obj
)
154 JSONDecoder::decode_json("name", name
, obj
);
155 JSONDecoder::decode_json("cluster_name", cluster_name
, obj
);
156 JSONDecoder::decode_json("cluster_uuid", cluster_uuid
, obj
);
157 es_version_decoder esv
;
158 JSONDecoder::decode_json("version", esv
, obj
);
159 version
= std::move(esv
.version
);
162 struct ElasticConfig
{
163 uint64_t sync_instance
{0};
166 std::unique_ptr
<RGWRESTConn
> conn
;
167 bool explicit_custom_meta
{true};
168 string override_index_path
;
169 ItemList index_buckets
;
170 ItemList allow_owners
;
171 uint32_t num_shards
{0};
172 uint32_t num_replicas
{0};
173 std::map
<string
,string
> default_headers
= {{ "Content-Type", "application/json" }};
175 void init(CephContext
*cct
, const JSONFormattable
& config
) {
176 string elastic_endpoint
= config
["endpoint"];
177 id
= string("elastic:") + elastic_endpoint
;
178 conn
.reset(new RGWRESTConn(cct
, nullptr, id
, { elastic_endpoint
}));
179 explicit_custom_meta
= config
["explicit_custom_meta"](true);
180 index_buckets
.init(config
["index_buckets_list"], true); /* approve all buckets by default */
181 allow_owners
.init(config
["approved_owners_list"], true); /* approve all bucket owners by default */
182 override_index_path
= config
["override_index_path"];
183 num_shards
= config
["num_shards"](ES_NUM_SHARDS_DEFAULT
);
184 if (num_shards
< ES_NUM_SHARDS_MIN
) {
185 num_shards
= ES_NUM_SHARDS_MIN
;
187 num_replicas
= config
["num_replicas"](ES_NUM_REPLICAS_DEFAULT
);
188 if (string user
= config
["username"], pw
= config
["password"];
189 !user
.empty() && !pw
.empty()) {
190 auto auth_string
= user
+ ":" + pw
;
191 default_headers
.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string
));
196 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
) {
197 sync_instance
= instance_id
;
199 if (!override_index_path
.empty()) {
200 index_path
= override_index_path
;
205 snprintf(buf
, sizeof(buf
), "-%08x", (uint32_t)(sync_instance
& 0xFFFFFFFF));
207 index_path
= "/rgw-" + realm
.get_name() + buf
;
210 string
get_index_path() {
214 map
<string
, string
>& get_request_headers() {
215 return default_headers
;
218 string
get_obj_path(const RGWBucketInfo
& bucket_info
, const rgw_obj_key
& key
) {
219 return index_path
+ "/object/" + url_encode(bucket_info
.bucket
.bucket_id
+ ":" + key
.name
+ ":" + (key
.instance
.empty() ? "null" : key
.instance
));
222 bool should_handle_operation(RGWBucketInfo
& bucket_info
) {
223 return index_buckets
.exists(bucket_info
.bucket
.name
) &&
224 allow_owners
.exists(bucket_info
.owner
.to_str());
228 using ElasticConfigRef
= std::shared_ptr
<ElasticConfig
>;
230 static const char *es_type_to_str(const ESType
& t
) {
232 case ESType::String
: return "string";
233 case ESType::Text
: return "text";
234 case ESType::Keyword
: return "keyword";
235 case ESType::Long
: return "long";
236 case ESType::Integer
: return "integer";
237 case ESType::Short
: return "short";
238 case ESType::Byte
: return "byte";
239 case ESType::Double
: return "double";
240 case ESType::Float
: return "float";
241 case ESType::Half_Float
: return "half_float";
242 case ESType::Scaled_Float
: return "scaled_float";
243 case ESType::Date
: return "date";
244 case ESType::Boolean
: return "boolean";
245 case ESType::Integer_Range
: return "integer_range";
246 case ESType::Float_Range
: return "float_range";
247 case ESType::Double_Range
: return "date_range";
248 case ESType::Date_Range
: return "date_range";
249 case ESType::Geo_Point
: return "geo_point";
250 case ESType::Ip
: return "ip";
258 const char *format
{nullptr};
259 std::optional
<bool> analyzed
;
261 es_type_v2(ESType et
) : estype(et
) {}
263 void dump(Formatter
*f
) const {
264 const char *type_str
= es_type_to_str(estype
);
265 encode_json("type", type_str
, f
);
267 encode_json("format", format
, f
);
270 auto is_analyzed
= analyzed
;
272 if (estype
== ESType::String
&&
278 encode_json("index", (is_analyzed
.value() ? "analyzed" : "not_analyzed"), f
);
285 const char *format
{nullptr};
286 std::optional
<bool> analyzed
;
287 std::optional
<bool> index
;
289 es_type_v5(ESType et
) : estype(et
) {}
291 void dump(Formatter
*f
) const {
293 if (estype
!= ESType::String
) {
296 bool is_analyzed
= analyzed
.value_or(false);
297 new_estype
= (is_analyzed
? ESType::Text
: ESType::Keyword
);
298 /* index = true; ... Not setting index=true, because that's the default,
299 * and dumping a boolean value *might* be a problem when backporting this
300 * because value might get quoted
304 const char *type_str
= es_type_to_str(new_estype
);
305 encode_json("type", type_str
, f
);
307 encode_json("format", format
, f
);
310 encode_json("index", index
.value(), f
);
316 struct es_type
: public T
{
317 es_type(T t
) : T(t
) {}
318 es_type
& set_format(const char *f
) {
323 es_type
& set_analyzed(bool a
) {
330 struct es_index_mappings
{
331 ESType string_type
{ESType::String
};
333 es_type
<T
> est(ESType t
) const {
334 return es_type
<T
>(t
);
337 void dump_custom(const char *section
, ESType type
, const char *format
, Formatter
*f
) const {
338 f
->open_object_section(section
);
339 ::encode_json("type", "nested", f
);
340 f
->open_object_section("properties");
341 encode_json("name", est(string_type
), f
);
342 encode_json("value", est(type
).set_format(format
), f
);
343 f
->close_section(); // entry
344 f
->close_section(); // custom-string
347 void dump(Formatter
*f
) const {
348 f
->open_object_section("object");
349 f
->open_object_section("properties");
350 encode_json("bucket", est(string_type
), f
);
351 encode_json("name", est(string_type
), f
);
352 encode_json("instance", est(string_type
), f
);
353 encode_json("versioned_epoch", est(ESType::Long
), f
);
354 f
->open_object_section("meta");
355 f
->open_object_section("properties");
356 encode_json("cache_control", est(string_type
), f
);
357 encode_json("content_disposition", est(string_type
), f
);
358 encode_json("content_encoding", est(string_type
), f
);
359 encode_json("content_language", est(string_type
), f
);
360 encode_json("content_type", est(string_type
), f
);
361 encode_json("storage_class", est(string_type
), f
);
362 encode_json("etag", est(string_type
), f
);
363 encode_json("expires", est(string_type
), f
);
364 encode_json("mtime", est(ESType::Date
)
365 .set_format("strict_date_optional_time||epoch_millis"), f
);
366 encode_json("size", est(ESType::Long
), f
);
367 dump_custom("custom-string", string_type
, nullptr, f
);
368 dump_custom("custom-int", ESType::Long
, nullptr, f
);
369 dump_custom("custom-date", ESType::Date
, "strict_date_optional_time||epoch_millis", f
);
370 f
->close_section(); // properties
371 f
->close_section(); // meta
372 f
->close_section(); // properties
373 f
->close_section(); // object
377 struct es_index_settings
{
378 uint32_t num_replicas
;
381 es_index_settings(uint32_t _replicas
, uint32_t _shards
) : num_replicas(_replicas
), num_shards(_shards
) {}
383 void dump(Formatter
*f
) const {
384 encode_json("number_of_replicas", num_replicas
, f
);
385 encode_json("number_of_shards", num_shards
, f
);
389 struct es_index_config_base
{
390 virtual ~es_index_config_base() {}
391 virtual void dump(Formatter
*f
) const = 0;
395 struct es_index_config
: public es_index_config_base
{
396 es_index_settings settings
;
397 es_index_mappings
<T
> mappings
;
399 es_index_config(es_index_settings
& _s
) : settings(_s
) {}
401 void dump(Formatter
*f
) const {
402 encode_json("settings", settings
, f
);
403 encode_json("mappings", mappings
, f
);
407 static bool is_sys_attr(const std::string
& attr_name
){
408 static constexpr std::initializer_list
<const char*> rgw_sys_attrs
=
410 RGW_ATTR_SOURCE_ZONE
,
412 RGW_ATTR_TEMPURL_KEY1
,
413 RGW_ATTR_TEMPURL_KEY2
,
418 return std::find(rgw_sys_attrs
.begin(), rgw_sys_attrs
.end(), attr_name
) != rgw_sys_attrs
.end();
421 static size_t attr_len(const bufferlist
& val
)
423 size_t len
= val
.length();
424 if (len
&& val
[len
- 1] == '\0') {
431 struct es_obj_metadata
{
433 ElasticConfigRef es_conf
;
434 RGWBucketInfo bucket_info
;
436 ceph::real_time mtime
;
438 map
<string
, bufferlist
> attrs
;
439 uint64_t versioned_epoch
;
441 es_obj_metadata(CephContext
*_cct
, ElasticConfigRef _es_conf
, const RGWBucketInfo
& _bucket_info
,
442 const rgw_obj_key
& _key
, ceph::real_time
& _mtime
, uint64_t _size
,
443 map
<string
, bufferlist
>& _attrs
, uint64_t _versioned_epoch
) : cct(_cct
), es_conf(_es_conf
), bucket_info(_bucket_info
), key(_key
),
444 mtime(_mtime
), size(_size
), attrs(std::move(_attrs
)), versioned_epoch(_versioned_epoch
) {}
446 void dump(Formatter
*f
) const {
447 map
<string
, string
> out_attrs
;
448 map
<string
, string
> custom_meta
;
449 RGWAccessControlPolicy policy
;
450 set
<string
> permissions
;
453 for (auto i
: attrs
) {
454 const string
& attr_name
= i
.first
;
455 bufferlist
& val
= i
.second
;
457 if (!boost::algorithm::starts_with(attr_name
, RGW_ATTR_PREFIX
)) {
461 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_META_PREFIX
)) {
462 custom_meta
.emplace(attr_name
.substr(sizeof(RGW_ATTR_META_PREFIX
) - 1),
463 string(val
.c_str(), attr_len(val
)));
467 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_CRYPT_PREFIX
)) {
471 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_OLH_PREFIX
)) {
472 // skip versioned object olh info
476 if (attr_name
== RGW_ATTR_ACL
) {
478 auto i
= val
.cbegin();
480 } catch (buffer::error
& err
) {
481 ldout(cct
, 0) << "ERROR: failed to decode acl for " << bucket_info
.bucket
<< "/" << key
<< dendl
;
485 const RGWAccessControlList
& acl
= policy
.get_acl();
487 permissions
.insert(policy
.get_owner().get_id().to_str());
488 for (auto acliter
: acl
.get_grant_map()) {
489 const ACLGrant
& grant
= acliter
.second
;
490 if (grant
.get_type().get_type() == ACL_TYPE_CANON_USER
&&
491 ((uint32_t)grant
.get_permission().get_permissions() & RGW_PERM_READ
) != 0) {
493 if (grant
.get_id(user
)) {
494 permissions
.insert(user
.to_str());
498 } else if (attr_name
== RGW_ATTR_TAGS
) {
500 auto tags_bl
= val
.cbegin();
501 decode(obj_tags
, tags_bl
);
502 } catch (buffer::error
& err
) {
503 ldout(cct
,0) << "ERROR: failed to decode obj tags for "
504 << bucket_info
.bucket
<< "/" << key
<< dendl
;
507 } else if (attr_name
== RGW_ATTR_COMPRESSION
) {
508 RGWCompressionInfo cs_info
;
510 auto vals_bl
= val
.cbegin();
511 decode(cs_info
, vals_bl
);
512 } catch (buffer::error
& err
) {
513 ldout(cct
,0) << "ERROR: failed to decode compression attr for "
514 << bucket_info
.bucket
<< "/" << key
<< dendl
;
517 out_attrs
.emplace("compression",std::move(cs_info
.compression_type
));
519 if (!is_sys_attr(attr_name
)) {
520 out_attrs
.emplace(attr_name
.substr(sizeof(RGW_ATTR_PREFIX
) - 1),
521 std::string(val
.c_str(), attr_len(val
)));
525 ::encode_json("bucket", bucket_info
.bucket
.name
, f
);
526 ::encode_json("name", key
.name
, f
);
527 string instance
= key
.instance
;
528 if (instance
.empty())
530 ::encode_json("instance", instance
, f
);
531 ::encode_json("versioned_epoch", versioned_epoch
, f
);
532 ::encode_json("owner", policy
.get_owner(), f
);
533 ::encode_json("permissions", permissions
, f
);
534 f
->open_object_section("meta");
535 ::encode_json("size", size
, f
);
538 rgw_to_iso8601(mtime
, &mtime_str
);
539 ::encode_json("mtime", mtime_str
, f
);
540 for (auto i
: out_attrs
) {
541 ::encode_json(i
.first
.c_str(), i
.second
, f
);
543 map
<string
, string
> custom_str
;
544 map
<string
, string
> custom_int
;
545 map
<string
, string
> custom_date
;
547 for (auto i
: custom_meta
) {
548 auto config
= bucket_info
.mdsearch_config
.find(i
.first
);
549 if (config
== bucket_info
.mdsearch_config
.end()) {
550 if (!es_conf
->explicit_custom_meta
) {
551 /* default custom meta is of type string */
552 custom_str
[i
.first
] = i
.second
;
554 ldout(cct
, 20) << "custom meta entry key=" << i
.first
<< " not found in bucket mdsearch config: " << bucket_info
.mdsearch_config
<< dendl
;
558 switch (config
->second
) {
559 case ESEntityTypeMap::ES_ENTITY_DATE
:
560 custom_date
[i
.first
] = i
.second
;
562 case ESEntityTypeMap::ES_ENTITY_INT
:
563 custom_int
[i
.first
] = i
.second
;
566 custom_str
[i
.first
] = i
.second
;
570 if (!custom_str
.empty()) {
571 f
->open_array_section("custom-string");
572 for (auto i
: custom_str
) {
573 f
->open_object_section("entity");
574 ::encode_json("name", i
.first
.c_str(), f
);
575 ::encode_json("value", i
.second
, f
);
580 if (!custom_int
.empty()) {
581 f
->open_array_section("custom-int");
582 for (auto i
: custom_int
) {
583 f
->open_object_section("entity");
584 ::encode_json("name", i
.first
.c_str(), f
);
585 ::encode_json("value", i
.second
, f
);
590 if (!custom_date
.empty()) {
591 f
->open_array_section("custom-date");
592 for (auto i
: custom_date
) {
594 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
595 * which will end up with failed sync
598 int r
= parse_time(i
.second
.c_str(), &t
);
600 ldout(cct
, 20) << __func__
<< "(): failed to parse time (" << i
.second
<< "), skipping encoding of custom date attribute" << dendl
;
605 rgw_to_iso8601(t
, &time_str
);
607 f
->open_object_section("entity");
608 ::encode_json("name", i
.first
.c_str(), f
);
609 ::encode_json("value", time_str
.c_str(), f
);
614 f
->close_section(); // meta
615 const auto& m
= obj_tags
.get_tags();
617 f
->open_array_section("tagging");
618 for (const auto &it
: m
) {
619 f
->open_object_section("tag");
620 ::encode_json("key", it
.first
, f
);
621 ::encode_json("value",it
.second
, f
);
624 f
->close_section(); // tagging
629 class RGWElasticInitConfigCBCR
: public RGWCoroutine
{
630 RGWDataSyncEnv
*sync_env
;
631 ElasticConfigRef conf
;
634 struct _err_response
{
636 vector
<err_reason
> root_cause
;
641 void decode_json(JSONObj
*obj
) {
642 JSONDecoder::decode_json("root_cause", root_cause
, obj
);
643 JSONDecoder::decode_json("type", type
, obj
);
644 JSONDecoder::decode_json("reason", reason
, obj
);
645 JSONDecoder::decode_json("index", index
, obj
);
649 void decode_json(JSONObj
*obj
) {
650 JSONDecoder::decode_json("error", error
, obj
);
655 RGWElasticInitConfigCBCR(RGWDataSyncEnv
*_sync_env
,
656 ElasticConfigRef _conf
) : RGWCoroutine(_sync_env
->cct
),
659 int operate() override
{
661 ldout(sync_env
->cct
, 0) << ": init elasticsearch config zone=" << sync_env
->source_zone
<< dendl
;
662 yield
call(new RGWReadRESTResourceCR
<ESInfo
> (sync_env
->cct
,
664 sync_env
->http_manager
,
665 "/", nullptr /*params*/,
666 &(conf
->default_headers
),
669 return set_cr_error(retcode
);
673 string path
= conf
->get_index_path();
674 ldout(sync_env
->cct
, 5) << "got elastic version=" << es_info
.get_version_str() << dendl
;
676 es_index_settings
settings(conf
->num_replicas
, conf
->num_shards
);
678 std::unique_ptr
<es_index_config_base
> index_conf
;
680 if (es_info
.version
>= ES_V5
) {
681 ldout(sync_env
->cct
, 0) << "elasticsearch: index mapping: version >= 5" << dendl
;
682 index_conf
.reset(new es_index_config
<es_type_v5
>(settings
));
684 ldout(sync_env
->cct
, 0) << "elasticsearch: index mapping: version < 5" << dendl
;
685 index_conf
.reset(new es_index_config
<es_type_v2
>(settings
));
687 call(new RGWPutRESTResourceCR
<es_index_config_base
, int, _err_response
> (sync_env
->cct
,
689 sync_env
->http_manager
,
690 path
, nullptr /*params*/,
691 &(conf
->default_headers
),
692 *index_conf
, nullptr, &err_response
));
695 ldout(sync_env
->cct
, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response
.error
.type
<< " response.reason=" << err_response
.error
.reason
<< dendl
;
697 if (err_response
.error
.type
!= "index_already_exists_exception") {
698 return set_cr_error(retcode
);
701 ldout(sync_env
->cct
, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl
;
703 return set_cr_done();
710 class RGWElasticHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
711 ElasticConfigRef conf
;
712 uint64_t versioned_epoch
;
714 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv
*_sync_env
,
715 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
,
716 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sync_env
, _bucket_info
, _key
), conf(_conf
),
717 versioned_epoch(_versioned_epoch
) {}
718 int operate() override
{
720 ldout(sync_env
->cct
, 10) << ": stat of remote obj: z=" << sync_env
->source_zone
721 << " b=" << bucket_info
.bucket
<< " k=" << key
722 << " size=" << size
<< " mtime=" << mtime
<< dendl
;
725 string path
= conf
->get_obj_path(bucket_info
, key
);
726 es_obj_metadata
doc(sync_env
->cct
, conf
, bucket_info
, key
, mtime
, size
, attrs
, versioned_epoch
);
728 call(new RGWPutRESTResourceCR
<es_obj_metadata
, int>(sync_env
->cct
, conf
->conn
.get(),
729 sync_env
->http_manager
,
730 path
, nullptr /* params */,
731 &(conf
->default_headers
),
732 doc
, nullptr /* result */));
736 return set_cr_error(retcode
);
738 return set_cr_done();
744 class RGWElasticHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
745 ElasticConfigRef conf
;
746 uint64_t versioned_epoch
;
748 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv
*_sync_env
,
749 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
,
750 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sync_env
, _bucket_info
, _key
),
751 conf(_conf
), versioned_epoch(_versioned_epoch
) {
754 ~RGWElasticHandleRemoteObjCR() override
{}
756 RGWStatRemoteObjCBCR
*allocate_callback() override
{
757 return new RGWElasticHandleRemoteObjCBCR(sync_env
, bucket_info
, key
, conf
, versioned_epoch
);
761 class RGWElasticRemoveRemoteObjCBCR
: public RGWCoroutine
{
762 RGWDataSyncEnv
*sync_env
;
763 RGWBucketInfo bucket_info
;
765 ceph::real_time mtime
;
766 ElasticConfigRef conf
;
768 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv
*_sync_env
,
769 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
770 ElasticConfigRef _conf
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
771 bucket_info(_bucket_info
), key(_key
),
772 mtime(_mtime
), conf(_conf
) {}
773 int operate() override
{
775 ldout(sync_env
->cct
, 10) << ": remove remote obj: z=" << sync_env
->source_zone
776 << " b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
778 string path
= conf
->get_obj_path(bucket_info
, key
);
780 call(new RGWDeleteRESTResourceCR(sync_env
->cct
, conf
->conn
.get(),
781 sync_env
->http_manager
,
782 path
, nullptr /* params */));
785 return set_cr_error(retcode
);
787 return set_cr_done();
794 class RGWElasticDataSyncModule
: public RGWDataSyncModule
{
795 ElasticConfigRef conf
;
797 RGWElasticDataSyncModule(CephContext
*cct
, const JSONFormattable
& config
) : conf(std::make_shared
<ElasticConfig
>()) {
798 conf
->init(cct
, config
);
800 ~RGWElasticDataSyncModule() override
{}
802 void init(RGWDataSyncEnv
*sync_env
, uint64_t instance_id
) override
{
803 conf
->init_instance(sync_env
->store
->svc
.zone
->get_realm(), instance_id
);
806 RGWCoroutine
*init_sync(RGWDataSyncEnv
*sync_env
) override
{
807 ldout(sync_env
->cct
, 5) << conf
->id
<< ": init" << dendl
;
808 return new RGWElasticInitConfigCBCR(sync_env
, conf
);
810 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
811 ldout(sync_env
->cct
, 10) << conf
->id
<< ": sync_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
812 if (!conf
->should_handle_operation(bucket_info
)) {
813 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
816 return new RGWElasticHandleRemoteObjCR(sync_env
, bucket_info
, key
, conf
, versioned_epoch
.value_or(0));
818 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
819 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
820 ldout(sync_env
->cct
, 10) << conf
->id
<< ": rm_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
821 if (!conf
->should_handle_operation(bucket_info
)) {
822 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
825 return new RGWElasticRemoveRemoteObjCBCR(sync_env
, bucket_info
, key
, mtime
, conf
);
827 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
828 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
829 ldout(sync_env
->cct
, 10) << conf
->id
<< ": create_delete_marker: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
830 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
831 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (not handled)" << dendl
;
834 RGWRESTConn
*get_rest_conn() {
835 return conf
->conn
.get();
838 string
get_index_path() {
839 return conf
->get_index_path();
842 map
<string
, string
>& get_request_headers() {
843 return conf
->get_request_headers();
847 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext
*cct
, const JSONFormattable
& config
)
849 data_handler
= std::unique_ptr
<RGWElasticDataSyncModule
>(new RGWElasticDataSyncModule(cct
, config
));
852 RGWDataSyncModule
*RGWElasticSyncModuleInstance::get_data_handler()
854 return data_handler
.get();
857 RGWRESTConn
*RGWElasticSyncModuleInstance::get_rest_conn()
859 return data_handler
->get_rest_conn();
862 string
RGWElasticSyncModuleInstance::get_index_path() {
863 return data_handler
->get_index_path();
866 map
<string
, string
>& RGWElasticSyncModuleInstance::get_request_headers() {
867 return data_handler
->get_request_headers();
870 RGWRESTMgr
*RGWElasticSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
871 if (dialect
!= RGW_REST_S3
) {
875 return new RGWRESTMgr_MDSearch_S3();
878 int RGWElasticSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) {
879 string endpoint
= config
["endpoint"];
880 instance
->reset(new RGWElasticSyncModuleInstance(cct
, config
));