1 #include "rgw_common.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_sync_module.h"
4 #include "rgw_data_sync.h"
5 #include "rgw_sync_module_es.h"
6 #include "rgw_sync_module_es_rest.h"
7 #include "rgw_rest_conn.h"
8 #include "rgw_cr_rest.h"
10 #include "rgw_es_query.h"
12 #include "include/str_list.h"
14 #include <boost/asio/yield.hpp>
16 #define dout_subsys ceph_subsys_rgw
20 * whitelist utility. Config string is a list of entries, where an entry is either an item,
21 * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
22 * a prefix would be a string ending with an asterisk, a suffix would be a string starting
23 * with an asterisk. For example:
25 * bucket1, bucket2, foo*, *bar
28 bool approve_all
{false};
34 void parse(const string
& str
) {
37 get_str_list(str
, ",", l
);
39 for (auto& entry
: l
) {
40 entry
= rgw_trim_whitespace(entry
);
50 if (entry
[0] == '*') {
51 suffixes
.insert(entry
.substr(1));
55 if (entry
.back() == '*') {
56 prefixes
.insert(entry
.substr(0, entry
.size() - 1));
60 entries
.insert(entry
);
66 void init(const string
& str
, bool def_val
) {
68 approve_all
= def_val
;
74 bool exists(const string
& entry
) {
79 if (entries
.find(entry
) != entries
.end()) {
83 auto i
= prefixes
.upper_bound(entry
);
84 if (i
!= prefixes
.begin()) {
86 if (boost::algorithm::starts_with(entry
, *i
)) {
91 for (i
= suffixes
.begin(); i
!= suffixes
.end(); ++i
) {
92 if (boost::algorithm::ends_with(entry
, *i
)) {
101 #define ES_NUM_SHARDS_MIN 5
103 #define ES_NUM_SHARDS_DEFAULT 16
104 #define ES_NUM_REPLICAS_DEFAULT 1
106 struct ElasticConfig
{
107 uint64_t sync_instance
{0};
110 std::unique_ptr
<RGWRESTConn
> conn
;
111 bool explicit_custom_meta
{true};
112 string override_index_path
;
113 ItemList index_buckets
;
114 ItemList allow_owners
;
115 uint32_t num_shards
{0};
116 uint32_t num_replicas
{0};
118 void init(CephContext
*cct
, const map
<string
, string
, ltstr_nocase
>& config
) {
119 string elastic_endpoint
= rgw_conf_get(config
, "endpoint", "");
120 id
= string("elastic:") + elastic_endpoint
;
121 conn
.reset(new RGWRESTConn(cct
, nullptr, id
, { elastic_endpoint
}));
122 explicit_custom_meta
= rgw_conf_get_bool(config
, "explicit_custom_meta", true);
123 index_buckets
.init(rgw_conf_get(config
, "index_buckets_list", ""), true); /* approve all buckets by default */
124 allow_owners
.init(rgw_conf_get(config
, "approved_owners_list", ""), true); /* approve all bucket owners by default */
125 override_index_path
= rgw_conf_get(config
, "override_index_path", "");
126 num_shards
= rgw_conf_get_int(config
, "num_shards", ES_NUM_SHARDS_DEFAULT
);
127 if (num_shards
< ES_NUM_SHARDS_MIN
) {
128 num_shards
= ES_NUM_SHARDS_MIN
;
130 num_replicas
= rgw_conf_get_int(config
, "num_replicas", ES_NUM_REPLICAS_DEFAULT
);
133 void init_instance(RGWRealm
& realm
, uint64_t instance_id
) {
134 sync_instance
= instance_id
;
136 if (!override_index_path
.empty()) {
137 index_path
= override_index_path
;
142 snprintf(buf
, sizeof(buf
), "-%08x", (uint32_t)(sync_instance
& 0xFFFFFFFF));
144 index_path
= "/rgw-" + realm
.get_name() + buf
;
147 string
get_index_path() {
151 string
get_obj_path(const RGWBucketInfo
& bucket_info
, const rgw_obj_key
& key
) {
152 return index_path
+ "/object/" + bucket_info
.bucket
.bucket_id
+ ":" + key
.name
+ ":" + (key
.instance
.empty() ? "null" : key
.instance
);
155 bool should_handle_operation(RGWBucketInfo
& bucket_info
) {
156 return index_buckets
.exists(bucket_info
.bucket
.name
) &&
157 allow_owners
.exists(bucket_info
.owner
.to_str());
161 using ElasticConfigRef
= std::shared_ptr
<ElasticConfig
>;
163 struct es_dump_type
{
168 es_dump_type(const char *t
, const char *f
= nullptr, bool a
= false) : type(t
), format(f
), analyzed(a
) {}
170 void dump(Formatter
*f
) const {
171 encode_json("type", type
, f
);
173 encode_json("format", format
, f
);
175 if (!analyzed
&& strcmp(type
, "string") == 0) {
176 encode_json("index", "not_analyzed", f
);
181 struct es_index_mappings
{
182 void dump_custom(Formatter
*f
, const char *section
, const char *type
, const char *format
) const {
183 f
->open_object_section(section
);
184 ::encode_json("type", "nested", f
);
185 f
->open_object_section("properties");
186 encode_json("name", es_dump_type("string"), f
);
187 encode_json("value", es_dump_type(type
, format
), f
);
188 f
->close_section(); // entry
189 f
->close_section(); // custom-string
191 void dump(Formatter
*f
) const {
192 f
->open_object_section("object");
193 f
->open_object_section("properties");
194 encode_json("bucket", es_dump_type("string"), f
);
195 encode_json("name", es_dump_type("string"), f
);
196 encode_json("instance", es_dump_type("string"), f
);
197 encode_json("versioned_epoch", es_dump_type("long"), f
);
198 f
->open_object_section("meta");
199 f
->open_object_section("properties");
200 encode_json("cache_control", es_dump_type("string"), f
);
201 encode_json("content_disposition", es_dump_type("string"), f
);
202 encode_json("content_encoding", es_dump_type("string"), f
);
203 encode_json("content_language", es_dump_type("string"), f
);
204 encode_json("content_type", es_dump_type("string"), f
);
205 encode_json("etag", es_dump_type("string"), f
);
206 encode_json("expires", es_dump_type("string"), f
);
207 f
->open_object_section("mtime");
208 ::encode_json("type", "date", f
);
209 ::encode_json("format", "strict_date_optional_time||epoch_millis", f
);
210 f
->close_section(); // mtime
211 encode_json("size", es_dump_type("long"), f
);
212 dump_custom(f
, "custom-string", "string", nullptr);
213 dump_custom(f
, "custom-int", "long", nullptr);
214 dump_custom(f
, "custom-date", "date", "strict_date_optional_time||epoch_millis");
215 f
->close_section(); // properties
216 f
->close_section(); // meta
217 f
->close_section(); // properties
218 f
->close_section(); // object
222 struct es_index_settings
{
223 uint32_t num_replicas
;
226 es_index_settings(uint32_t _replicas
, uint32_t _shards
) : num_replicas(_replicas
), num_shards(_shards
) {}
228 void dump(Formatter
*f
) const {
229 encode_json("number_of_replicas", num_replicas
, f
);
230 encode_json("number_of_shards", num_shards
, f
);
234 struct es_index_config
{
235 es_index_settings settings
;
236 es_index_mappings mappings
;
238 es_index_config(es_index_settings
& _s
, es_index_mappings
& _m
) : settings(_s
), mappings(_m
) {}
240 void dump(Formatter
*f
) const {
241 encode_json("settings", settings
, f
);
242 encode_json("mappings", mappings
, f
);
246 struct es_obj_metadata
{
248 ElasticConfigRef es_conf
;
249 RGWBucketInfo bucket_info
;
251 ceph::real_time mtime
;
253 map
<string
, bufferlist
> attrs
;
254 uint64_t versioned_epoch
;
256 es_obj_metadata(CephContext
*_cct
, ElasticConfigRef _es_conf
, const RGWBucketInfo
& _bucket_info
,
257 const rgw_obj_key
& _key
, ceph::real_time
& _mtime
, uint64_t _size
,
258 map
<string
, bufferlist
>& _attrs
, uint64_t _versioned_epoch
) : cct(_cct
), es_conf(_es_conf
), bucket_info(_bucket_info
), key(_key
),
259 mtime(_mtime
), size(_size
), attrs(std::move(_attrs
)), versioned_epoch(_versioned_epoch
) {}
261 void dump(Formatter
*f
) const {
262 map
<string
, string
> out_attrs
;
263 map
<string
, string
> custom_meta
;
264 RGWAccessControlPolicy policy
;
265 set
<string
> permissions
;
268 for (auto i
: attrs
) {
269 const string
& attr_name
= i
.first
;
271 bufferlist
& val
= i
.second
;
273 if (attr_name
.compare(0, sizeof(RGW_ATTR_PREFIX
) - 1, RGW_ATTR_PREFIX
) != 0) {
277 if (attr_name
.compare(0, sizeof(RGW_ATTR_META_PREFIX
) - 1, RGW_ATTR_META_PREFIX
) == 0) {
278 name
= attr_name
.substr(sizeof(RGW_ATTR_META_PREFIX
) - 1);
279 custom_meta
[name
] = string(val
.c_str(), (val
.length() > 0 ? val
.length() - 1 : 0));
283 name
= attr_name
.substr(sizeof(RGW_ATTR_PREFIX
) - 1);
287 auto i
= val
.begin();
289 } catch (buffer::error
& err
) {
290 ldout(cct
, 0) << "ERROR: failed to decode acl for " << bucket_info
.bucket
<< "/" << key
<< dendl
;
293 const RGWAccessControlList
& acl
= policy
.get_acl();
295 permissions
.insert(policy
.get_owner().get_id().to_str());
296 for (auto acliter
: acl
.get_grant_map()) {
297 const ACLGrant
& grant
= acliter
.second
;
298 if (grant
.get_type().get_type() == ACL_TYPE_CANON_USER
&&
299 ((uint32_t)grant
.get_permission().get_permissions() & RGW_PERM_READ
) != 0) {
301 if (grant
.get_id(user
)) {
302 permissions
.insert(user
.to_str());
306 } else if (name
== "x-amz-tagging") {
307 auto tags_bl
= val
.begin();
308 ::decode(obj_tags
, tags_bl
);
309 } else if (name
== "compression") {
310 RGWCompressionInfo cs_info
;
311 auto vals_bl
= val
.begin();
312 decode(cs_info
, vals_bl
);
313 out_attrs
[name
] = cs_info
.compression_type
;
315 if (name
!= "pg_ver" &&
316 name
!= "source_zone" &&
318 out_attrs
[name
] = string(val
.c_str(), (val
.length() > 0 ? val
.length() - 1 : 0));
322 ::encode_json("bucket", bucket_info
.bucket
.name
, f
);
323 ::encode_json("name", key
.name
, f
);
324 ::encode_json("instance", key
.instance
, f
);
325 ::encode_json("versioned_epoch", versioned_epoch
, f
);
326 ::encode_json("owner", policy
.get_owner(), f
);
327 ::encode_json("permissions", permissions
, f
);
328 f
->open_object_section("meta");
329 ::encode_json("size", size
, f
);
332 rgw_to_iso8601(mtime
, &mtime_str
);
333 ::encode_json("mtime", mtime_str
, f
);
334 for (auto i
: out_attrs
) {
335 ::encode_json(i
.first
.c_str(), i
.second
, f
);
337 map
<string
, string
> custom_str
;
338 map
<string
, string
> custom_int
;
339 map
<string
, string
> custom_date
;
341 for (auto i
: custom_meta
) {
342 auto config
= bucket_info
.mdsearch_config
.find(i
.first
);
343 if (config
== bucket_info
.mdsearch_config
.end()) {
344 if (!es_conf
->explicit_custom_meta
) {
345 /* default custom meta is of type string */
346 custom_str
[i
.first
] = i
.second
;
348 ldout(cct
, 20) << "custom meta entry key=" << i
.first
<< " not found in bucket mdsearch config: " << bucket_info
.mdsearch_config
<< dendl
;
352 switch (config
->second
) {
353 case ESEntityTypeMap::ES_ENTITY_DATE
:
354 custom_date
[i
.first
] = i
.second
;
356 case ESEntityTypeMap::ES_ENTITY_INT
:
357 custom_int
[i
.first
] = i
.second
;
360 custom_str
[i
.first
] = i
.second
;
364 if (!custom_str
.empty()) {
365 f
->open_array_section("custom-string");
366 for (auto i
: custom_str
) {
367 f
->open_object_section("entity");
368 ::encode_json("name", i
.first
.c_str(), f
);
369 ::encode_json("value", i
.second
, f
);
374 if (!custom_int
.empty()) {
375 f
->open_array_section("custom-int");
376 for (auto i
: custom_int
) {
377 f
->open_object_section("entity");
378 ::encode_json("name", i
.first
.c_str(), f
);
379 ::encode_json("value", i
.second
, f
);
384 if (!custom_date
.empty()) {
385 f
->open_array_section("custom-date");
386 for (auto i
: custom_date
) {
388 * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
389 * which will end up with failed sync
392 int r
= parse_time(i
.second
.c_str(), &t
);
394 ldout(cct
, 20) << __func__
<< "(): failed to parse time (" << i
.second
<< "), skipping encoding of custom date attribute" << dendl
;
399 rgw_to_iso8601(t
, &time_str
);
401 f
->open_object_section("entity");
402 ::encode_json("name", i
.first
.c_str(), f
);
403 ::encode_json("value", time_str
.c_str(), f
);
408 f
->close_section(); // meta
409 const auto& m
= obj_tags
.get_tags();
411 f
->open_array_section("tagging");
412 for (const auto &it
: m
) {
413 f
->open_object_section("tag");
414 ::encode_json("key", it
.first
, f
);
415 ::encode_json("value",it
.second
, f
);
418 f
->close_section(); // tagging
423 class RGWElasticInitConfigCBCR
: public RGWCoroutine
{
424 RGWDataSyncEnv
*sync_env
;
425 ElasticConfigRef conf
;
427 RGWElasticInitConfigCBCR(RGWDataSyncEnv
*_sync_env
,
428 ElasticConfigRef _conf
) : RGWCoroutine(_sync_env
->cct
),
431 int operate() override
{
433 ldout(sync_env
->cct
, 0) << ": init elasticsearch config zone=" << sync_env
->source_zone
<< dendl
;
435 string path
= conf
->get_index_path();
437 es_index_settings
settings(conf
->num_replicas
, conf
->num_shards
);
438 es_index_mappings mappings
;
440 es_index_config
index_conf(settings
, mappings
);
442 call(new RGWPutRESTResourceCR
<es_index_config
, int>(sync_env
->cct
, conf
->conn
.get(),
443 sync_env
->http_manager
,
444 path
, nullptr /* params */,
445 index_conf
, nullptr /* result */));
448 return set_cr_error(retcode
);
450 return set_cr_done();
457 class RGWElasticHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
458 ElasticConfigRef conf
;
459 uint64_t versioned_epoch
;
461 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv
*_sync_env
,
462 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
,
463 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sync_env
, _bucket_info
, _key
), conf(_conf
),
464 versioned_epoch(_versioned_epoch
) {}
465 int operate() override
{
467 ldout(sync_env
->cct
, 10) << ": stat of remote obj: z=" << sync_env
->source_zone
468 << " b=" << bucket_info
.bucket
<< " k=" << key
<< " size=" << size
<< " mtime=" << mtime
469 << " attrs=" << attrs
<< dendl
;
471 string path
= conf
->get_obj_path(bucket_info
, key
);
472 es_obj_metadata
doc(sync_env
->cct
, conf
, bucket_info
, key
, mtime
, size
, attrs
, versioned_epoch
);
474 call(new RGWPutRESTResourceCR
<es_obj_metadata
, int>(sync_env
->cct
, conf
->conn
.get(),
475 sync_env
->http_manager
,
476 path
, nullptr /* params */,
477 doc
, nullptr /* result */));
481 return set_cr_error(retcode
);
483 return set_cr_done();
489 class RGWElasticHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
490 ElasticConfigRef conf
;
491 uint64_t versioned_epoch
;
493 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv
*_sync_env
,
494 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
,
495 ElasticConfigRef _conf
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sync_env
, _bucket_info
, _key
),
496 conf(_conf
), versioned_epoch(_versioned_epoch
) {
499 ~RGWElasticHandleRemoteObjCR() override
{}
501 RGWStatRemoteObjCBCR
*allocate_callback() override
{
502 return new RGWElasticHandleRemoteObjCBCR(sync_env
, bucket_info
, key
, conf
, versioned_epoch
);
506 class RGWElasticRemoveRemoteObjCBCR
: public RGWCoroutine
{
507 RGWDataSyncEnv
*sync_env
;
508 RGWBucketInfo bucket_info
;
510 ceph::real_time mtime
;
511 ElasticConfigRef conf
;
513 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv
*_sync_env
,
514 RGWBucketInfo
& _bucket_info
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
515 ElasticConfigRef _conf
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
516 bucket_info(_bucket_info
), key(_key
),
517 mtime(_mtime
), conf(_conf
) {}
518 int operate() override
{
520 ldout(sync_env
->cct
, 10) << ": remove remote obj: z=" << sync_env
->source_zone
521 << " b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
523 string path
= conf
->get_obj_path(bucket_info
, key
);
525 call(new RGWDeleteRESTResourceCR(sync_env
->cct
, conf
->conn
.get(),
526 sync_env
->http_manager
,
527 path
, nullptr /* params */));
530 return set_cr_error(retcode
);
532 return set_cr_done();
539 class RGWElasticDataSyncModule
: public RGWDataSyncModule
{
540 ElasticConfigRef conf
;
542 RGWElasticDataSyncModule(CephContext
*cct
, const map
<string
, string
, ltstr_nocase
>& config
) : conf(std::make_shared
<ElasticConfig
>()) {
543 conf
->init(cct
, config
);
545 ~RGWElasticDataSyncModule() override
{}
547 void init(RGWDataSyncEnv
*sync_env
, uint64_t instance_id
) override
{
548 conf
->init_instance(sync_env
->store
->get_realm(), instance_id
);
551 RGWCoroutine
*init_sync(RGWDataSyncEnv
*sync_env
) override
{
552 ldout(sync_env
->cct
, 5) << conf
->id
<< ": init" << dendl
;
553 return new RGWElasticInitConfigCBCR(sync_env
, conf
);
555 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
556 ldout(sync_env
->cct
, 10) << conf
->id
<< ": sync_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
557 if (!conf
->should_handle_operation(bucket_info
)) {
558 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
561 return new RGWElasticHandleRemoteObjCR(sync_env
, bucket_info
, key
, conf
, versioned_epoch
);
563 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
564 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
565 ldout(sync_env
->cct
, 10) << conf
->id
<< ": rm_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
566 if (!conf
->should_handle_operation(bucket_info
)) {
567 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (bucket not approved)" << dendl
;
570 return new RGWElasticRemoveRemoteObjCBCR(sync_env
, bucket_info
, key
, mtime
, conf
);
572 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
573 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
574 ldout(sync_env
->cct
, 10) << conf
->id
<< ": create_delete_marker: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
575 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
576 ldout(sync_env
->cct
, 10) << conf
->id
<< ": skipping operation (not handled)" << dendl
;
579 RGWRESTConn
*get_rest_conn() {
580 return conf
->conn
.get();
583 string
get_index_path() {
584 return conf
->get_index_path();
588 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext
*cct
, const map
<string
, string
, ltstr_nocase
>& config
)
590 data_handler
= std::unique_ptr
<RGWElasticDataSyncModule
>(new RGWElasticDataSyncModule(cct
, config
));
593 RGWDataSyncModule
*RGWElasticSyncModuleInstance::get_data_handler()
595 return data_handler
.get();
598 RGWRESTConn
*RGWElasticSyncModuleInstance::get_rest_conn()
600 return data_handler
->get_rest_conn();
603 string
RGWElasticSyncModuleInstance::get_index_path() {
604 return data_handler
->get_index_path();
607 RGWRESTMgr
*RGWElasticSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
608 if (dialect
!= RGW_REST_S3
) {
612 return new RGWRESTMgr_MDSearch_S3();
615 int RGWElasticSyncModule::create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) {
617 auto i
= config
.find("endpoint");
618 if (i
!= config
.end()) {
619 endpoint
= i
->second
;
621 instance
->reset(new RGWElasticSyncModuleInstance(cct
, config
));