]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/services/svc_bilog_rados.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / services / svc_bilog_rados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "svc_bilog_rados.h"
5 #include "svc_bi_rados.h"
6
7 #include "cls/rgw/cls_rgw_client.h"
8
9
10 #define dout_subsys ceph_subsys_rgw
11
12 RGWSI_BILog_RADOS::RGWSI_BILog_RADOS(CephContext *cct) : RGWServiceInstance(cct)
13 {
14 }
15
16 void RGWSI_BILog_RADOS::init(RGWSI_BucketIndex_RADOS *bi_rados_svc)
17 {
18 svc.bi = bi_rados_svc;
19 }
20
21 int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, string& start_marker, string& end_marker)
22 {
23 RGWSI_RADOS::Pool index_pool;
24 map<int, string> bucket_objs;
25
26 BucketIndexShardsManager start_marker_mgr;
27 BucketIndexShardsManager end_marker_mgr;
28
29 int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
30 if (r < 0) {
31 return r;
32 }
33
34 r = start_marker_mgr.from_string(start_marker, shard_id);
35 if (r < 0) {
36 return r;
37 }
38
39 r = end_marker_mgr.from_string(end_marker, shard_id);
40 if (r < 0) {
41 return r;
42 }
43
44 return CLSRGWIssueBILogTrim(index_pool.ioctx(), start_marker_mgr, end_marker_mgr, bucket_objs,
45 cct->_conf->rgw_bucket_index_max_aio)();
46 }
47
48 int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id)
49 {
50 RGWSI_RADOS::Pool index_pool;
51 map<int, string> bucket_objs;
52 int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
53 if (r < 0)
54 return r;
55
56 return CLSRGWIssueResyncBucketBILog(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
57 }
58
59 int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id)
60 {
61 RGWSI_RADOS::Pool index_pool;
62 map<int, string> bucket_objs;
63 int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
64 if (r < 0)
65 return r;
66
67 return CLSRGWIssueBucketBILogStop(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
68 }
69
70 static void build_bucket_index_marker(const string& shard_id_str,
71 const string& shard_marker,
72 string *marker) {
73 if (marker) {
74 *marker = shard_id_str;
75 marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
76 marker->append(shard_marker);
77 }
78 }
79
80 int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max,
81 std::list<rgw_bi_log_entry>& result, bool *truncated)
82 {
83 ldpp_dout(dpp, 20) << __func__ << ": " << bucket_info.bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
84 result.clear();
85
86 RGWSI_RADOS::Pool index_pool;
87 map<int, string> oids;
88 map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
89 int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &oids, nullptr);
90 if (r < 0)
91 return r;
92
93 BucketIndexShardsManager marker_mgr;
94 bool has_shards = (oids.size() > 1 || shard_id >= 0);
95 // If there are multiple shards for the bucket index object, the marker
96 // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
97 // {shard_marker_2}...', if there is no sharding, the bi_log_list should
98 // only contain one record, and the key is the bucket instance id.
99 r = marker_mgr.from_string(marker, shard_id);
100 if (r < 0)
101 return r;
102
103 r = CLSRGWIssueBILogList(index_pool.ioctx(), marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
104 if (r < 0)
105 return r;
106
107 map<int, list<rgw_bi_log_entry>::iterator> vcurrents;
108 map<int, list<rgw_bi_log_entry>::iterator> vends;
109 if (truncated) {
110 *truncated = false;
111 }
112 map<int, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
113 for (; miter != bi_log_lists.end(); ++miter) {
114 int shard_id = miter->first;
115 vcurrents[shard_id] = miter->second.entries.begin();
116 vends[shard_id] = miter->second.entries.end();
117 if (truncated) {
118 *truncated = (*truncated || miter->second.truncated);
119 }
120 }
121
122 size_t total = 0;
123 bool has_more = true;
124 map<int, list<rgw_bi_log_entry>::iterator>::iterator viter;
125 map<int, list<rgw_bi_log_entry>::iterator>::iterator eiter;
126 while (total < max && has_more) {
127 has_more = false;
128
129 viter = vcurrents.begin();
130 eiter = vends.begin();
131
132 for (; total < max && viter != vcurrents.end(); ++viter, ++eiter) {
133 assert (eiter != vends.end());
134
135 int shard_id = viter->first;
136 list<rgw_bi_log_entry>::iterator& liter = viter->second;
137
138 if (liter == eiter->second){
139 continue;
140 }
141 rgw_bi_log_entry& entry = *(liter);
142 if (has_shards) {
143 char buf[16];
144 snprintf(buf, sizeof(buf), "%d", shard_id);
145 string tmp_id;
146 build_bucket_index_marker(buf, entry.id, &tmp_id);
147 entry.id.swap(tmp_id);
148 }
149 marker_mgr.add(shard_id, entry.id);
150 result.push_back(entry);
151 total++;
152 has_more = true;
153 ++liter;
154 }
155 }
156
157 if (truncated) {
158 for (viter = vcurrents.begin(), eiter = vends.begin(); viter != vcurrents.end(); ++viter, ++eiter) {
159 assert (eiter != vends.end());
160 *truncated = (*truncated || (viter->second != eiter->second));
161 }
162 }
163
164 // Refresh marker, if there are multiple shards, the output will look like
165 // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
166 // if there is no sharding, the simply marker (without oid) is returned
167 if (has_shards) {
168 marker_mgr.to_string(&marker);
169 } else {
170 if (!result.empty()) {
171 marker = result.rbegin()->id;
172 }
173 }
174
175 return 0;
176 }
177
178 int RGWSI_BILog_RADOS::get_log_status(const DoutPrefixProvider *dpp,
179 const RGWBucketInfo& bucket_info,
180 int shard_id,
181 map<int, string> *markers,
182 optional_yield y)
183 {
184 vector<rgw_bucket_dir_header> headers;
185 map<int, string> bucket_instance_ids;
186 int r = svc.bi->cls_bucket_head(dpp, bucket_info, shard_id, &headers, &bucket_instance_ids, y);
187 if (r < 0)
188 return r;
189
190 ceph_assert(headers.size() == bucket_instance_ids.size());
191
192 auto iter = headers.begin();
193 map<int, string>::iterator viter = bucket_instance_ids.begin();
194
195 for(; iter != headers.end(); ++iter, ++viter) {
196 if (shard_id >= 0) {
197 (*markers)[shard_id] = iter->max_marker;
198 } else {
199 (*markers)[viter->first] = iter->max_marker;
200 }
201 }
202
203 return 0;
204 }
205