1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_es.h"
10 #include "rgw_sync_module_es_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rest.h"
14 #include "rgw_es_query.h"
17 #include "services/svc_zone.h"
19 #include "include/str_list.h"
21 #include <boost/asio/yield.hpp>
23 #define dout_subsys ceph_subsys_rgw
27 * whitelist utility. Config string is a list of entries, where an entry is either an item,
28 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
29 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
30 * with an asterisk. For example:
32 * bucket1, bucket2, foo*, *bar
35 bool approve_all
{false};
41 void parse(const string
& str
) {
44 get_str_list(str
, ",", l
);
46 for (auto& entry
: l
) {
47 entry
= rgw_trim_whitespace(entry
);
57 if (entry
[0] == '*') {
58 suffixes
.insert(entry
.substr(1));
62 if (entry
.back() == '*') {
63 prefixes
.insert(entry
.substr(0, entry
.size() - 1));
67 entries
.insert(entry
);
73 void init(const string
& str
, bool def_val
) {
75 approve_all
= def_val
;
81 bool exists(const string
& entry
) {
86 if (entries
.find(entry
) != entries
.end()) {
90 auto i
= prefixes
.upper_bound(entry
);
91 if (i
!= prefixes
.begin()) {
93 if (boost::algorithm::starts_with(entry
, *i
)) {
98 for (i
= suffixes
.begin(); i
!= suffixes
.end(); ++i
) {
99 if (boost::algorithm::ends_with(entry
, *i
)) {
108 #define ES_NUM_SHARDS_MIN 5
110 #define ES_NUM_SHARDS_DEFAULT 16
111 #define ES_NUM_REPLICAS_DEFAULT 1
113 using ESVersion
= std::pair
<int,int>;
114 static constexpr ESVersion ES_V5
{5,0};
115 static constexpr ESVersion ES_V7
{7,0};
119 std::string cluster_name
;
120 std::string cluster_uuid
;
123 void decode_json(JSONObj
*obj
);
125 std::string
get_version_str(){
126 return std::to_string(version
.first
) + "." + std::to_string(version
.second
);
130 // simple wrapper structure to wrap the es version nested type
131 struct es_version_decoder
{
134 int parse_version(const std::string
& s
) {
136 int ret
= sscanf(s
.c_str(), "%d.%d", &major
, &minor
);
140 version
= std::make_pair(major
,minor
);
144 void decode_json(JSONObj
*obj
) {
146 JSONDecoder::decode_json("number",s
,obj
);
147 if (parse_version(s
) < 0)
148 throw JSONDecoder::err("Failed to parse ElasticVersion");
153 void ESInfo::decode_json(JSONObj
*obj
)
155 JSONDecoder::decode_json("name", name
, obj
);
156 JSONDecoder::decode_json("cluster_name", cluster_name
, obj
);
157 JSONDecoder::decode_json("cluster_uuid", cluster_uuid
, obj
);
158 es_version_decoder esv
;
159 JSONDecoder::decode_json("version", esv
, obj
);
160 version
= std::move(esv
.version
);
163 struct ElasticConfig
{
164 uint64_t sync_instance
{0};
167 std::unique_ptr
<RGWRESTConn
> conn
;
168 bool explicit_custom_meta
{true};
169 string override_index_path
;
170 ItemList index_buckets
;
171 ItemList allow_owners
;
172 uint32_t num_shards
{0};
173 uint32_t num_replicas
{0};
174 std::map
<string
,string
> default_headers
= {{ "Content-Type", "application/json" }};
177 void init(CephContext
*cct
, const JSONFormattable
& config
) {
178 string elastic_endpoint
= config
["endpoint"];
179 id
= string("elastic:") + elastic_endpoint
;
180 conn
.reset(new RGWRESTConn(cct
, nullptr, id
, { elastic_endpoint
}));
181 explicit_custom_meta
= config
["explicit_custom_meta"](true);
182 index_buckets
.init(config
["index_buckets_list"], true); /* approve all buckets by default */
183 allow_owners
.init(config
["approved_owners_list"], true); /* approve all bucket owners by default */
184 override_index_path
= config
["override_index_path"];
185 num_shards
= config
["num_shards"](ES_NUM_SHARDS_DEFAULT
);
186 if (num_shards
< ES_NUM_SHARDS_MIN
) {
187 num_shards
= ES_NUM_SHARDS_MIN
;
189 num_replicas
= config
["num_replicas"](ES_NUM_REPLICAS_DEFAULT
);
190 if (string user
= config
["username"], pw
= config
["password"];
191 !user
.empty() && !pw
.empty()) {
192 auto auth_string
= user
+ ":" + pw
;
193 default_headers
.emplace("AUTHORIZATION", "Basic " + rgw::to_base64(auth_string
));
198 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
) {
199 sync_instance
= instance_id
;
201 if (!override_index_path
.empty()) {
202 index_path
= override_index_path
;
207 snprintf(buf
, sizeof(buf
), "-%08x", (uint32_t)(sync_instance
& 0xFFFFFFFF));
209 index_path
= "/rgw-" + realm
.get_name() + buf
;
212 string
get_index_path() {
216 map
<string
, string
>& get_request_headers() {
217 return default_headers
;
220 string
get_obj_path(const RGWBucketInfo
& bucket_info
, const rgw_obj_key
& key
) {
221 if (es_info
.version
>= ES_V7
) {
222 return index_path
+ "/_doc/" + url_encode(bucket_info
.bucket
.bucket_id
+ ":" + key
.name
+ ":" + (key
.instance
.empty() ? "null" : key
.instance
));
225 return index_path
+ "/object/" + url_encode(bucket_info
.bucket
.bucket_id
+ ":" + key
.name
+ ":" + (key
.instance
.empty() ? "null" : key
.instance
));
229 bool should_handle_operation(RGWBucketInfo
& bucket_info
) {
230 return index_buckets
.exists(bucket_info
.bucket
.name
) &&
231 allow_owners
.exists(bucket_info
.owner
.to_str());
235 using ElasticConfigRef
= std::shared_ptr
<ElasticConfig
>;
237 static const char *es_type_to_str(const ESType
& t
) {
239 case ESType::String
: return "string";
240 case ESType::Text
: return "text";
241 case ESType::Keyword
: return "keyword";
242 case ESType::Long
: return "long";
243 case ESType::Integer
: return "integer";
244 case ESType::Short
: return "short";
245 case ESType::Byte
: return "byte";
246 case ESType::Double
: return "double";
247 case ESType::Float
: return "float";
248 case ESType::Half_Float
: return "half_float";
249 case ESType::Scaled_Float
: return "scaled_float";
250 case ESType::Date
: return "date";
251 case ESType::Boolean
: return "boolean";
252 case ESType::Integer_Range
: return "integer_range";
253 case ESType::Float_Range
: return "float_range";
254 case ESType::Double_Range
: return "date_range";
255 case ESType::Date_Range
: return "date_range";
256 case ESType::Geo_Point
: return "geo_point";
257 case ESType::Ip
: return "ip";
265 const char *format
{nullptr};
266 std::optional
<bool> analyzed
;
268 es_type_v2(ESType et
) : estype(et
) {}
270 void dump(Formatter
*f
) const {
271 const char *type_str
= es_type_to_str(estype
);
272 encode_json("type", type_str
, f
);
274 encode_json("format", format
, f
);
277 auto is_analyzed
= analyzed
;
279 if (estype
== ESType::String
&&
285 encode_json("index", (is_analyzed
.value() ? "analyzed" : "not_analyzed"), f
);
292 const char *format
{nullptr};
293 std::optional
<bool> analyzed
;
294 std::optional
<bool> index
;
296 es_type_v5(ESType et
) : estype(et
) {}
298 void dump(Formatter
*f
) const {
300 if (estype
!= ESType::String
) {
303 bool is_analyzed
= analyzed
.value_or(false);
304 new_estype
= (is_analyzed
? ESType::Text
: ESType::Keyword
);
305 /* index = true; ... Not setting index=true, because that's the default,
306 * and dumping a boolean value *might* be a problem when backporting this
307 * because value might get quoted
311 const char *type_str
= es_type_to_str(new_estype
);
312 encode_json("type", type_str
, f
);
314 encode_json("format", format
, f
);
317 encode_json("index", index
.value(), f
);
323 struct es_type
: public T
{
324 es_type(T t
) : T(t
) {}
325 es_type
& set_format(const char *f
) {
330 es_type
& set_analyzed(bool a
) {
337 struct es_index_mappings
{
338 ESVersion es_version
;
339 ESType string_type
{ESType::String
};
341 es_index_mappings(ESVersion esv
):es_version(esv
) {
344 es_type
<T
> est(ESType t
) const {
345 return es_type
<T
>(t
);
348 void dump_custom(const char *section
, ESType type
, const char *format
, Formatter
*f
) const {
349 f
->open_object_section(section
);
350 ::encode_json("type", "nested", f
);
351 f
->open_object_section("properties");
352 encode_json("name", est(string_type
), f
);
353 encode_json("value", est(type
).set_format(format
), f
);
354 f
->close_section(); // entry
355 f
->close_section(); // custom-string
358 void dump(Formatter
*f
) const {
359 if (es_version
<= ES_V7
)
360 f
->open_object_section("object");
361 f
->open_object_section("properties");
362 encode_json("bucket", est(string_type
), f
);
363 encode_json("name", est(string_type
), f
);
364 encode_json("instance", est(string_type
), f
);
365 encode_json("versioned_epoch", est(ESType::Long
), f
);
366 f
->open_object_section("meta");
367 f
->open_object_section("properties");
368 encode_json("cache_control", est(string_type
), f
);
369 encode_json("content_disposition", est(string_type
), f
);
370 encode_json("content_encoding", est(string_type
), f
);
371 encode_json("content_language", est(string_type
), f
);
372 encode_json("content_type", est(string_type
), f
);
373 encode_json("storage_class", est(string_type
), f
);
374 encode_json("etag", est(string_type
), f
);
375 encode_json("expires", est(string_type
), f
);
376 encode_json("mtime", est(ESType::Date
)
377 .set_format("strict_date_optional_time||epoch_millis"), f
);
378 encode_json("size", est(ESType::Long
), f
);
379 dump_custom("custom-string", string_type
, nullptr, f
);
380 dump_custom("custom-int", ESType::Long
, nullptr, f
);
381 dump_custom("custom-date", ESType::Date
, "strict_date_optional_time||epoch_millis", f
);
382 f
->close_section(); // properties
383 f
->close_section(); // meta
384 f
->close_section(); // properties
386 if (es_version
<= ES_V7
)
387 f
->close_section(); // object
391 struct es_index_settings
{
392 uint32_t num_replicas
;
395 es_index_settings(uint32_t _replicas
, uint32_t _shards
) : num_replicas(_replicas
), num_shards(_shards
) {}
397 void dump(Formatter
*f
) const {
398 encode_json("number_of_replicas", num_replicas
, f
);
399 encode_json("number_of_shards", num_shards
, f
);
403 struct es_index_config_base
{
404 virtual ~es_index_config_base() {}
405 virtual void dump(Formatter
*f
) const = 0;
409 struct es_index_config
: public es_index_config_base
{
410 es_index_settings settings
;
411 es_index_mappings
<T
> mappings
;
413 es_index_config(es_index_settings
& _s
, ESVersion esv
) : settings(_s
), mappings(esv
) {
416 void dump(Formatter
*f
) const {
417 encode_json("settings", settings
, f
);
418 encode_json("mappings", mappings
, f
);
422 static bool is_sys_attr(const std::string
& attr_name
){
423 static constexpr std::initializer_list
<const char*> rgw_sys_attrs
=
425 RGW_ATTR_SOURCE_ZONE
,
427 RGW_ATTR_TEMPURL_KEY1
,
428 RGW_ATTR_TEMPURL_KEY2
,
433 return std::find(rgw_sys_attrs
.begin(), rgw_sys_attrs
.end(), attr_name
) != rgw_sys_attrs
.end();
436 static size_t attr_len(const bufferlist
& val
)
438 size_t len
= val
.length();
439 if (len
&& val
[len
- 1] == '\0') {
446 struct es_obj_metadata
{
448 ElasticConfigRef es_conf
;
449 RGWBucketInfo bucket_info
;
451 ceph::real_time mtime
;
453 map
<string
, bufferlist
> attrs
;
454 uint64_t versioned_epoch
;
456 es_obj_metadata(CephContext
*_cct
, ElasticConfigRef _es_conf
, const RGWBucketInfo
& _bucket_info
,
457 const rgw_obj_key
& _key
, ceph::real_time
& _mtime
, uint64_t _size
,
458 map
<string
, bufferlist
>& _attrs
, uint64_t _versioned_epoch
) : cct(_cct
), es_conf(_es_conf
), bucket_info(_bucket_info
), key(_key
),
459 mtime(_mtime
), size(_size
), attrs(std::move(_attrs
)), versioned_epoch(_versioned_epoch
) {}
461 void dump(Formatter
*f
) const {
462 map
<string
, string
> out_attrs
;
463 map
<string
, string
> custom_meta
;
464 RGWAccessControlPolicy policy
;
465 set
<string
> permissions
;
468 for (auto i
: attrs
) {
469 const string
& attr_name
= i
.first
;
470 bufferlist
& val
= i
.second
;
472 if (!boost::algorithm::starts_with(attr_name
, RGW_ATTR_PREFIX
)) {
476 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_META_PREFIX
)) {
477 custom_meta
.emplace(attr_name
.substr(sizeof(RGW_ATTR_META_PREFIX
) - 1),
478 string(val
.c_str(), attr_len(val
)));
482 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_CRYPT_PREFIX
)) {
486 if (boost::algorithm::starts_with(attr_name
, RGW_ATTR_OLH_PREFIX
)) {
487 // skip versioned object olh info
491 if (attr_name
== RGW_ATTR_ACL
) {
493 auto i
= val
.cbegin();
495 } catch (buffer::error
& err
) {
496 ldout(cct
, 0) << "ERROR: failed to decode acl for " << bucket_info
.bucket
<< "/" << key
<< dendl
;
500 const RGWAccessControlList
& acl
= policy
.get_acl();
502 permissions
.insert(policy
.get_owner().get_id().to_str());
503 for (auto acliter
: acl
.get_grant_map()) {
504 const ACLGrant
& grant
= acliter
.second
;
505 if (grant
.get_type().get_type() == ACL_TYPE_CANON_USER
&&
506 ((uint32_t)grant
.get_permission().get_permissions() & RGW_PERM_READ
) != 0) {
508 if (grant
.get_id(user
)) {
509 permissions
.insert(user
.to_str());
513 } else if (attr_name
== RGW_ATTR_TAGS
) {
515 auto tags_bl
= val
.cbegin();
516 decode(obj_tags
, tags_bl
);
517 } catch (buffer::error
& err
) {
518 ldout(cct
,0) << "ERROR: failed to decode obj tags for "
519 << bucket_info
.bucket
<< "/" << key
<< dendl
;
522 } else if (attr_name
== RGW_ATTR_COMPRESSION
) {
523 RGWCompressionInfo cs_info
;
525 auto vals_bl
= val
.cbegin();
526 decode(cs_info
, vals_bl
);
527 } catch (buffer::error
& err
) {
528 ldout(cct
,0) << "ERROR: failed to decode compression attr for "
529 << bucket_info
.bucket
<< "/" << key
<< dendl
;
532 out_attrs
.emplace("compression",std::move(cs_info
.compression_type
));
534 if (!is_sys_attr(attr_name
)) {
535 out_attrs
.emplace(attr_name
.substr(sizeof(RGW_ATTR_PREFIX
) - 1),
536 std::string(val
.c_str(), attr_len(val
)));
540 ::encode_json("bucket", bucket_info
.bucket
.name
, f
);
541 ::encode_json("name", key
.name
, f
);
542 string instance
= key
.instance
;
543 if (instance
.empty())
545 ::encode_json("instance", instance
, f
);
546 ::encode_json("versioned_epoch", versioned_epoch
, f
);
547 ::encode_json("owner", policy
.get_owner(), f
);
548 ::encode_json("permissions", permissions
, f
);
549 f
->open_object_section("meta");
550 ::encode_json("size", size
, f
);
553 rgw_to_iso8601(mtime
, &mtime_str
);
554 ::encode_json("mtime", mtime_str
, f
);
555 for (auto i
: out_attrs
) {
556 ::encode_json(i
.first
.c_str(), i
.second
, f
);
558 map
<string
, string
> custom_str
;
559 map
<string
, string
> custom_int
;
560 map
<string
, string
> custom_date
;
562 for (auto i
: custom_meta
) {
563 auto config
= bucket_info
.mdsearch_config
.find(i
.first
);
564 if (config
== bucket_info
.mdsearch_config
.end()) {
565 if (!es_conf
->explicit_custom_meta
) {
566 /* default custom meta is of type string */
567 custom_str
[i
.first
] = i
.second
;
569 ldout(cct
, 20) << "custom meta entry key=" << i
.first
<< " not found in bucket mdsearch config: " << bucket_info
.mdsearch_config
<< dendl
;
573 switch (config
->second
) {
574 case ESEntityTypeMap::ES_ENTITY_DATE
:
575 custom_date
[i
.first
] = i
.second
;
577 case ESEntityTypeMap::ES_ENTITY_INT
:
578 custom_int
[i
.first
] = i
.second
;
581 custom_str
[i
.first
] = i
.second
;
585 if (!custom_str
.empty()) {
586 f
->open_array_section("custom-string");
587 for (auto i
: custom_str
) {
588 f
->open_object_section("entity");
589 ::encode_json("name", i
.first
.c_str(), f
);
590 ::encode_json("value", i
.second
, f
);
595 if (!custom_int
.empty()) {
596 f
->open_array_section("custom-int");
597 for (auto i
: custom_int
) {
598 f
->open_object_section("entity");
599 ::encode_json("name", i
.first
.c_str(), f
);
600 ::encode_json("value", i
.second
, f
);
605 if (!custom_date
.empty()) {
606 f
->open_array_section("custom-date");
607 for (auto i
: custom_date
) {
609 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
610 * which will end up with failed sync
613 int r
= parse_time(i
.second
.c_str(), &t
);
615 ldout(cct
, 20) << __func__
<< "(): failed to parse time (" << i
.second
<< "), skipping encoding of custom date attribute" << dendl
;
620 rgw_to_iso8601(t
, &time_str
);
622 f
->open_object_section("entity");
623 ::encode_json("name", i
.first
.c_str(), f
);
624 ::encode_json("value", time_str
.c_str(), f
);
629 f
->close_section(); // meta
630 const auto& m
= obj_tags
.get_tags();
632 f
->open_array_section("tagging");
633 for (const auto &it
: m
) {
634 f
->open_object_section("tag");
635 ::encode_json("key", it
.first
, f
);
636 ::encode_json("value",it
.second
, f
);
639 f
->close_section(); // tagging
644 class RGWElasticGetESInfoCBCR
: public RGWCoroutine
{
646 RGWElasticGetESInfoCBCR(RGWDataSyncCtx
*_sc
,
647 ElasticConfigRef _conf
) : RGWCoroutine(_sc
->cct
),
648 sc(_sc
), sync_env(_sc
->env
),
650 int operate() override
{
652 ldout(sync_env
->cct
, 5) << conf
->id
<< ": get elasticsearch info for zone: " << sc
->source_zone
<< dendl
;
653 yield
call(new RGWReadRESTResourceCR
<ESInfo
> (sync_env
->cct
,
655 sync_env
->http_manager
,
656 "/", nullptr /*params*/,
657 &(conf
->default_headers
),
660 ldout(sync_env
->cct
, 5) << conf
->id
<< ": get elasticsearch failed: " << retcode
<< dendl
;
661 return set_cr_error(retcode
);
664 ldout(sync_env
->cct
, 5) << conf
->id
<< ": got elastic version=" << conf
->es_info
.get_version_str() << dendl
;
665 return set_cr_done();
671 RGWDataSyncEnv
*sync_env
;
672 ElasticConfigRef conf
;
675 class RGWElasticPutIndexCBCR
: public RGWCoroutine
{
677 RGWElasticPutIndexCBCR(RGWDataSyncCtx
*_sc
,
678 ElasticConfigRef _conf
) : RGWCoroutine(_sc
->cct
),
679 sc(_sc
), sync_env(_sc
->env
),
681 int operate() override
{
683 ldout(sc
->cct
, 5) << conf
->id
<< ": put elasticsearch index for zone: " << sc
->source_zone
<< dendl
;
686 string path
= conf
->get_index_path();
687 es_index_settings
settings(conf
->num_replicas
, conf
->num_shards
);
688 std::unique_ptr
<es_index_config_base
> index_conf
;
690 if (conf
->es_info
.version
>= ES_V5
) {
691 ldout(sc
->cct
, 0) << "elasticsearch: index mapping: version >= 5" << dendl
;
692 index_conf
.reset(new es_index_config
<es_type_v5
>(settings
, conf
->es_info
.version
));
694 ldout(sc
->cct
, 0) << "elasticsearch: index mapping: version < 5" << dendl
;
695 index_conf
.reset(new es_index_config
<es_type_v2
>(settings
, conf
->es_info
.version
));
697 call(new RGWPutRESTResourceCR
<es_index_config_base
, int, _err_response
> (sc
->cct
,
699 sync_env
->http_manager
,
700 path
, nullptr /*params*/,
701 &(conf
->default_headers
),
702 *index_conf
, nullptr, &err_response
));
706 if (err_response
.error
.type
!= "index_already_exists_exception" &&
707 err_response
.error
.type
!= "resource_already_exists_exception") {
708 ldout(sync_env
->cct
, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response
.error
.type
<< " response.reason=" << err_response
.error
.reason
<< dendl
;
709 return set_cr_error(retcode
);
712 ldout(sync_env
->cct
, 0) << "elasticsearch: index already exists, assuming external initialization" << dendl
;
714 return set_cr_done();
721 RGWDataSyncEnv
*sync_env
;
722 ElasticConfigRef conf
;
724 struct _err_response
{
726 vector
<err_reason
> root_cause
;
731 void decode_json(JSONObj
*obj
) {
732 JSONDecoder::decode_json("root_cause", root_cause
, obj
);
733 JSONDecoder::decode_json("type", type
, obj
);
734 JSONDecoder::decode_json("reason", reason
, obj
);
735 JSONDecoder::decode_json("index", index
, obj
);
739 void decode_json(JSONObj
*obj
) {
740 JSONDecoder::decode_json("error", error
, obj
);
745 class RGWElasticInitConfigCBCR
: public RGWCoroutine
{
747 RGWDataSyncEnv
*sync_env
;
748 ElasticConfigRef conf
;
751 RGWElasticInitConfigCBCR(RGWDataSyncCtx
*_sc
,
752 ElasticConfigRef _conf
) : RGWCoroutine(_sc
->cct
),
753 sc(_sc
), sync_env(_sc
->env
),
755 int operate() override
{
758 yield
call(new RGWElasticGetESInfoCBCR(sc
, conf
));
761 return set_cr_error(retcode
);
764 yield
call(new RGWElasticPutIndexCBCR(sc
, conf
));
766 return set_cr_error(retcode
);
768 return set_cr_done();
775 class RGWElasticHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
776 rgw_bucket_sync_pipe sync_pipe
;
777 ElasticConfigRef conf
;
778 uint64_t versioned_epoch
;
780 RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
781 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
782 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
783 sync_pipe(_sync_pipe
), conf(_conf
),
784 versioned_epoch(_versioned_epoch
) {}
785 int operate() override
{
787 ldout(sync_env
->cct
, 10) << ": stat of remote obj: z=" << sc
->source_zone
788 << " b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
789 << " size=" << size
<< " mtime=" << mtime
<< dendl
;
792 string path
= conf
->get_obj_path(sync_pipe
.dest_bucket_info
, key
);
793 es_obj_metadata
doc(sync_env
->cct
, conf
, sync_pipe
.dest_bucket_info
, key
, mtime
, size
, attrs
, versioned_epoch
);
795 call(new RGWPutRESTResourceCR
<es_obj_metadata
, int>(sync_env
->cct
, conf
->conn
.get(),
796 sync_env
->http_manager
,
797 path
, nullptr /* params */,
798 &(conf
->default_headers
),
799 doc
, nullptr /* result */));
803 return set_cr_error(retcode
);
805 return set_cr_done();
811 class RGWElasticHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
812 rgw_bucket_sync_pipe sync_pipe
;
813 ElasticConfigRef conf
;
814 uint64_t versioned_epoch
;
816 RGWElasticHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
817 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
818 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
819 sync_pipe(_sync_pipe
),
820 conf(_conf
), versioned_epoch(_versioned_epoch
) {
823 ~RGWElasticHandleRemoteObjCR() override
{}
825 RGWStatRemoteObjCBCR
*allocate_callback() override
{
826 return new RGWElasticHandleRemoteObjCBCR(sc
, sync_pipe
, key
, conf
, versioned_epoch
);
830 class RGWElasticRemoveRemoteObjCBCR
: public RGWCoroutine
{
832 RGWDataSyncEnv
*sync_env
;
833 rgw_bucket_sync_pipe sync_pipe
;
835 ceph::real_time mtime
;
836 ElasticConfigRef conf
;
838 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
839 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
840 ElasticConfigRef _conf
) : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
841 sync_pipe(_sync_pipe
), key(_key
),
842 mtime(_mtime
), conf(_conf
) {}
843 int operate() override
{
845 ldout(sync_env
->cct
, 10) << ": remove remote obj: z=" << sc
->source_zone
846 << " b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
848 string path
= conf
->get_obj_path(sync_pipe
.dest_bucket_info
, key
);
850 call(new RGWDeleteRESTResourceCR(sync_env
->cct
, conf
->conn
.get(),
851 sync_env
->http_manager
,
852 path
, nullptr /* params */));
855 return set_cr_error(retcode
);
857 return set_cr_done();
864 class RGWElasticDataSyncModule
: public RGWDataSyncModule
{
865 ElasticConfigRef conf
;
867 RGWElasticDataSyncModule(CephContext
*cct
, const JSONFormattable
& config
) : conf(std::make_shared
<ElasticConfig
>()) {
868 conf
->init(cct
, config
);
870 ~RGWElasticDataSyncModule() override
{}
872 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
873 conf
->init_instance(sc
->env
->svc
->zone
->get_realm(), instance_id
);
876 RGWCoroutine
*init_sync(RGWDataSyncCtx
*sc
) override
{
877 ldout(sc
->cct
, 5) << conf
->id
<< ": init" << dendl
;
878 return new RGWElasticInitConfigCBCR(sc
, conf
);
881 RGWCoroutine
*start_sync(RGWDataSyncCtx
*sc
) override
{
882 ldout(sc
->cct
, 5) << conf
->id
<< ": start_sync" << dendl
;
883 // try to get elastic search version
884 return new RGWElasticGetESInfoCBCR(sc
, conf
);
887 RGWCoroutine
*sync_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
888 ldout(sc
->cct
, 10) << conf
->id
<< ": sync_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
889 if (!conf
->should_handle_operation(sync_pipe
.dest_bucket_info
)) {
890 ldout(sc
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
893 return new RGWElasticHandleRemoteObjCR(sc
, sync_pipe
, key
, conf
, versioned_epoch
.value_or(0));
895 RGWCoroutine
*remove_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
896 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
897 ldout(sc
->cct
, 10) << conf
->id
<< ": rm_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
898 if (!conf
->should_handle_operation(sync_pipe
.dest_bucket_info
)) {
899 ldout(sc
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
902 return new RGWElasticRemoveRemoteObjCBCR(sc
, sync_pipe
, key
, mtime
, conf
);
904 RGWCoroutine
*create_delete_marker(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
905 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
906 ldout(sc
->cct
, 10) << conf
->id
<< ": create_delete_marker: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
907 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
908 ldout(sc
->cct
, 10) << conf
->id
<< ": skipping operation (not handled)" << dendl
;
911 RGWRESTConn
*get_rest_conn() {
912 return conf
->conn
.get();
915 string
get_index_path() {
916 return conf
->get_index_path();
919 map
<string
, string
>& get_request_headers() {
920 return conf
->get_request_headers();
924 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext
*cct
, const JSONFormattable
& config
)
926 data_handler
= std::unique_ptr
<RGWElasticDataSyncModule
>(new RGWElasticDataSyncModule(cct
, config
));
929 RGWDataSyncModule
*RGWElasticSyncModuleInstance::get_data_handler()
931 return data_handler
.get();
934 RGWRESTConn
*RGWElasticSyncModuleInstance::get_rest_conn()
936 return data_handler
->get_rest_conn();
939 string
RGWElasticSyncModuleInstance::get_index_path() {
940 return data_handler
->get_index_path();
943 map
<string
, string
>& RGWElasticSyncModuleInstance::get_request_headers() {
944 return data_handler
->get_request_headers();
947 RGWRESTMgr
*RGWElasticSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
948 if (dialect
!= RGW_REST_S3
) {
952 return new RGWRESTMgr_MDSearch_S3();
955 int RGWElasticSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) {
956 string endpoint
= config
["endpoint"];
957 instance
->reset(new RGWElasticSyncModuleInstance(cct
, config
));