1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "svc_bilog_rados.h"
5 #include "svc_bi_rados.h"
7 #include "cls/rgw/cls_rgw_client.h"
10 #define dout_subsys ceph_subsys_rgw
12 RGWSI_BILog_RADOS::RGWSI_BILog_RADOS(CephContext
*cct
) : RGWServiceInstance(cct
)
16 void RGWSI_BILog_RADOS::init(RGWSI_BucketIndex_RADOS
*bi_rados_svc
)
18 svc
.bi
= bi_rados_svc
;
21 int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
, string
& start_marker
, string
& end_marker
)
23 RGWSI_RADOS::Pool index_pool
;
24 map
<int, string
> bucket_objs
;
26 BucketIndexShardsManager start_marker_mgr
;
27 BucketIndexShardsManager end_marker_mgr
;
29 int r
= svc
.bi
->open_bucket_index(dpp
, bucket_info
, shard_id
, &index_pool
, &bucket_objs
, nullptr);
34 r
= start_marker_mgr
.from_string(start_marker
, shard_id
);
39 r
= end_marker_mgr
.from_string(end_marker
, shard_id
);
44 return CLSRGWIssueBILogTrim(index_pool
.ioctx(), start_marker_mgr
, end_marker_mgr
, bucket_objs
,
45 cct
->_conf
->rgw_bucket_index_max_aio
)();
48 int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
)
50 RGWSI_RADOS::Pool index_pool
;
51 map
<int, string
> bucket_objs
;
52 int r
= svc
.bi
->open_bucket_index(dpp
, bucket_info
, shard_id
, &index_pool
, &bucket_objs
, nullptr);
56 return CLSRGWIssueResyncBucketBILog(index_pool
.ioctx(), bucket_objs
, cct
->_conf
->rgw_bucket_index_max_aio
)();
59 int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
)
61 RGWSI_RADOS::Pool index_pool
;
62 map
<int, string
> bucket_objs
;
63 int r
= svc
.bi
->open_bucket_index(dpp
, bucket_info
, shard_id
, &index_pool
, &bucket_objs
, nullptr);
67 return CLSRGWIssueBucketBILogStop(index_pool
.ioctx(), bucket_objs
, cct
->_conf
->rgw_bucket_index_max_aio
)();
70 static void build_bucket_index_marker(const string
& shard_id_str
,
71 const string
& shard_marker
,
74 *marker
= shard_id_str
;
75 marker
->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR
);
76 marker
->append(shard_marker
);
80 int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
, string
& marker
, uint32_t max
,
81 std::list
<rgw_bi_log_entry
>& result
, bool *truncated
)
83 ldpp_dout(dpp
, 20) << __func__
<< ": " << bucket_info
.bucket
<< " marker " << marker
<< " shard_id=" << shard_id
<< " max " << max
<< dendl
;
86 RGWSI_RADOS::Pool index_pool
;
87 map
<int, string
> oids
;
88 map
<int, cls_rgw_bi_log_list_ret
> bi_log_lists
;
89 int r
= svc
.bi
->open_bucket_index(dpp
, bucket_info
, shard_id
, &index_pool
, &oids
, nullptr);
93 BucketIndexShardsManager marker_mgr
;
94 bool has_shards
= (oids
.size() > 1 || shard_id
>= 0);
95 // If there are multiple shards for the bucket index object, the marker
96 // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
97 // {shard_marker_2}...', if there is no sharding, the bi_log_list should
98 // only contain one record, and the key is the bucket instance id.
99 r
= marker_mgr
.from_string(marker
, shard_id
);
103 r
= CLSRGWIssueBILogList(index_pool
.ioctx(), marker_mgr
, max
, oids
, bi_log_lists
, cct
->_conf
->rgw_bucket_index_max_aio
)();
107 map
<int, list
<rgw_bi_log_entry
>::iterator
> vcurrents
;
108 map
<int, list
<rgw_bi_log_entry
>::iterator
> vends
;
112 map
<int, cls_rgw_bi_log_list_ret
>::iterator miter
= bi_log_lists
.begin();
113 for (; miter
!= bi_log_lists
.end(); ++miter
) {
114 int shard_id
= miter
->first
;
115 vcurrents
[shard_id
] = miter
->second
.entries
.begin();
116 vends
[shard_id
] = miter
->second
.entries
.end();
118 *truncated
= (*truncated
|| miter
->second
.truncated
);
123 bool has_more
= true;
124 map
<int, list
<rgw_bi_log_entry
>::iterator
>::iterator viter
;
125 map
<int, list
<rgw_bi_log_entry
>::iterator
>::iterator eiter
;
126 while (total
< max
&& has_more
) {
129 viter
= vcurrents
.begin();
130 eiter
= vends
.begin();
132 for (; total
< max
&& viter
!= vcurrents
.end(); ++viter
, ++eiter
) {
133 assert (eiter
!= vends
.end());
135 int shard_id
= viter
->first
;
136 list
<rgw_bi_log_entry
>::iterator
& liter
= viter
->second
;
138 if (liter
== eiter
->second
){
141 rgw_bi_log_entry
& entry
= *(liter
);
144 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
146 build_bucket_index_marker(buf
, entry
.id
, &tmp_id
);
147 entry
.id
.swap(tmp_id
);
149 marker_mgr
.add(shard_id
, entry
.id
);
150 result
.push_back(entry
);
158 for (viter
= vcurrents
.begin(), eiter
= vends
.begin(); viter
!= vcurrents
.end(); ++viter
, ++eiter
) {
159 assert (eiter
!= vends
.end());
160 *truncated
= (*truncated
|| (viter
->second
!= eiter
->second
));
164 // Refresh marker, if there are multiple shards, the output will look like
165 // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
166 // if there is no sharding, the simply marker (without oid) is returned
168 marker_mgr
.to_string(&marker
);
170 if (!result
.empty()) {
171 marker
= result
.rbegin()->id
;
178 int RGWSI_BILog_RADOS::get_log_status(const DoutPrefixProvider
*dpp
,
179 const RGWBucketInfo
& bucket_info
,
181 map
<int, string
> *markers
,
184 vector
<rgw_bucket_dir_header
> headers
;
185 map
<int, string
> bucket_instance_ids
;
186 int r
= svc
.bi
->cls_bucket_head(dpp
, bucket_info
, shard_id
, &headers
, &bucket_instance_ids
, y
);
190 ceph_assert(headers
.size() == bucket_instance_ids
.size());
192 auto iter
= headers
.begin();
193 map
<int, string
>::iterator viter
= bucket_instance_ids
.begin();
195 for(; iter
!= headers
.end(); ++iter
, ++viter
) {
197 (*markers
)[shard_id
] = iter
->max_marker
;
199 (*markers
)[viter
->first
] = iter
->max_marker
;