]>
Commit | Line | Data |
---|---|---|
31f18b77 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_rados.h" | |
5 | #include "rgw_bucket.h" | |
6 | #include "rgw_reshard.h" | |
7 | #include "cls/rgw/cls_rgw_client.h" | |
8 | #include "cls/lock/cls_lock_client.h" | |
9 | #include "common/errno.h" | |
10 | #include "common/ceph_json.h" | |
11 | ||
12 | #include "common/dout.h" | |
13 | ||
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
17 | const string reshard_oid_prefix = "reshard."; | |
18 | const string reshard_lock_name = "reshard_process"; | |
19 | const string bucket_instance_lock_name = "bucket_instance_lock"; | |
20 | ||
21 | using namespace std; | |
22 | ||
23 | #define RESHARD_SHARD_WINDOW 64 | |
24 | #define RESHARD_MAX_AIO 128 | |
25 | ||
26 | class BucketReshardShard { | |
27 | RGWRados *store; | |
28 | const RGWBucketInfo& bucket_info; | |
29 | int num_shard; | |
30 | RGWRados::BucketShard bs; | |
31 | vector<rgw_cls_bi_entry> entries; | |
32 | map<uint8_t, rgw_bucket_category_stats> stats; | |
33 | deque<librados::AioCompletion *>& aio_completions; | |
34 | ||
35 | int wait_next_completion() { | |
36 | librados::AioCompletion *c = aio_completions.front(); | |
37 | aio_completions.pop_front(); | |
38 | ||
39 | c->wait_for_safe(); | |
40 | ||
41 | int ret = c->get_return_value(); | |
42 | c->release(); | |
43 | ||
44 | if (ret < 0) { | |
45 | derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl; | |
46 | return ret; | |
47 | } | |
48 | ||
49 | return 0; | |
50 | } | |
51 | ||
52 | int get_completion(librados::AioCompletion **c) { | |
53 | if (aio_completions.size() >= RESHARD_MAX_AIO) { | |
54 | int ret = wait_next_completion(); | |
55 | if (ret < 0) { | |
56 | return ret; | |
57 | } | |
58 | } | |
59 | ||
60 | *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); | |
61 | aio_completions.push_back(*c); | |
62 | ||
63 | return 0; | |
64 | } | |
65 | ||
66 | public: | |
67 | BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info, | |
68 | int _num_shard, | |
69 | deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store), | |
70 | aio_completions(_completions) { | |
71 | num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); | |
72 | bs.init(bucket_info.bucket, num_shard); | |
73 | } | |
74 | ||
75 | int get_num_shard() { | |
76 | return num_shard; | |
77 | } | |
78 | ||
79 | int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category, | |
80 | const rgw_bucket_category_stats& entry_stats) { | |
81 | entries.push_back(entry); | |
82 | if (account) { | |
83 | rgw_bucket_category_stats& target = stats[category]; | |
84 | target.num_entries += entry_stats.num_entries; | |
85 | target.total_size += entry_stats.total_size; | |
86 | target.total_size_rounded += entry_stats.total_size_rounded; | |
91327a77 | 87 | target.actual_size += entry_stats.actual_size; |
31f18b77 FG |
88 | } |
89 | if (entries.size() >= RESHARD_SHARD_WINDOW) { | |
90 | int ret = flush(); | |
91 | if (ret < 0) { | |
92 | return ret; | |
93 | } | |
94 | } | |
95 | return 0; | |
96 | } | |
97 | int flush() { | |
98 | if (entries.size() == 0) { | |
99 | return 0; | |
100 | } | |
101 | ||
102 | librados::ObjectWriteOperation op; | |
103 | for (auto& entry : entries) { | |
104 | store->bi_put(op, bs, entry); | |
105 | } | |
106 | cls_rgw_bucket_update_stats(op, false, stats); | |
107 | ||
108 | librados::AioCompletion *c; | |
109 | int ret = get_completion(&c); | |
110 | if (ret < 0) { | |
111 | return ret; | |
112 | } | |
113 | ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); | |
114 | if (ret < 0) { | |
115 | derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl; | |
116 | return ret; | |
117 | } | |
118 | entries.clear(); | |
119 | stats.clear(); | |
120 | return 0; | |
121 | } | |
122 | ||
123 | int wait_all_aio() { | |
124 | int ret = 0; | |
125 | while (!aio_completions.empty()) { | |
126 | int r = wait_next_completion(); | |
127 | if (r < 0) { | |
128 | ret = r; | |
129 | } | |
130 | } | |
131 | return ret; | |
132 | } | |
133 | }; | |
134 | ||
135 | class BucketReshardManager { | |
136 | RGWRados *store; | |
137 | const RGWBucketInfo& target_bucket_info; | |
138 | deque<librados::AioCompletion *> completions; | |
139 | int num_target_shards; | |
140 | vector<BucketReshardShard *> target_shards; | |
141 | ||
142 | public: | |
143 | BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info), | |
144 | num_target_shards(_num_target_shards) { | |
145 | target_shards.resize(num_target_shards); | |
146 | for (int i = 0; i < num_target_shards; ++i) { | |
147 | target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); | |
148 | } | |
149 | } | |
150 | ||
151 | ~BucketReshardManager() { | |
152 | for (auto& shard : target_shards) { | |
153 | int ret = shard->wait_all_aio(); | |
154 | if (ret < 0) { | |
155 | ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl; | |
156 | } | |
157 | } | |
158 | } | |
159 | ||
160 | int add_entry(int shard_index, | |
161 | rgw_cls_bi_entry& entry, bool account, uint8_t category, | |
162 | const rgw_bucket_category_stats& entry_stats) { | |
163 | int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); | |
164 | if (ret < 0) { | |
165 | derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl; | |
166 | return ret; | |
167 | } | |
168 | return 0; | |
169 | } | |
170 | ||
171 | int finish() { | |
172 | int ret = 0; | |
173 | for (auto& shard : target_shards) { | |
174 | int r = shard->flush(); | |
175 | if (r < 0) { | |
176 | derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl; | |
177 | ret = r; | |
178 | } | |
179 | } | |
180 | for (auto& shard : target_shards) { | |
181 | int r = shard->wait_all_aio(); | |
182 | if (r < 0) { | |
183 | derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl; | |
184 | ret = r; | |
185 | } | |
186 | delete shard; | |
187 | } | |
188 | target_shards.clear(); | |
189 | return ret; | |
190 | } | |
191 | }; | |
192 | ||
193 | RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) : | |
194 | store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), | |
195 | reshard_lock(reshard_lock_name) { | |
196 | const rgw_bucket& b = bucket_info.bucket; | |
197 | reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; | |
198 | ||
199 | utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0); | |
200 | #define COOKIE_LEN 16 | |
201 | char cookie_buf[COOKIE_LEN + 1]; | |
202 | gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); | |
203 | cookie_buf[COOKIE_LEN] = '\0'; | |
204 | ||
205 | reshard_lock.set_cookie(cookie_buf); | |
206 | reshard_lock.set_duration(lock_duration); | |
207 | } | |
208 | ||
209 | int RGWBucketReshard::lock_bucket() | |
210 | { | |
211 | int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); | |
212 | if (ret < 0) { | |
213 | ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl; | |
214 | return ret; | |
215 | } | |
216 | return 0; | |
217 | } | |
218 | ||
219 | void RGWBucketReshard::unlock_bucket() | |
220 | { | |
221 | int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid); | |
222 | if (ret < 0) { | |
223 | ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl; | |
224 | } | |
225 | } | |
226 | ||
227 | int RGWBucketReshard::set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status) | |
228 | { | |
229 | if (new_instance_id.empty()) { | |
230 | ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl; | |
231 | return -EINVAL; | |
232 | } | |
233 | ||
234 | cls_rgw_bucket_instance_entry instance_entry; | |
235 | instance_entry.set_status(new_instance_id, num_shards, status); | |
236 | ||
237 | int ret = store->bucket_set_reshard(bucket_info, instance_entry); | |
238 | if (ret < 0) { | |
239 | ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " | |
240 | << cpp_strerror(-ret) << dendl; | |
241 | return ret; | |
242 | } | |
243 | return 0; | |
244 | } | |
245 | ||
246 | int RGWBucketReshard::clear_resharding() | |
247 | { | |
248 | cls_rgw_bucket_instance_entry instance_entry; | |
249 | ||
250 | int ret = store->bucket_set_reshard(bucket_info, instance_entry); | |
251 | if (ret < 0) { | |
252 | ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " | |
253 | << cpp_strerror(-ret) << dendl; | |
254 | return ret; | |
255 | } | |
256 | return 0; | |
257 | } | |
258 | ||
259 | static int create_new_bucket_instance(RGWRados *store, | |
260 | int new_num_shards, | |
261 | const RGWBucketInfo& bucket_info, | |
262 | map<string, bufferlist>& attrs, | |
263 | RGWBucketInfo& new_bucket_info) | |
264 | { | |
265 | new_bucket_info = bucket_info; | |
266 | ||
267 | store->create_bucket_id(&new_bucket_info.bucket.bucket_id); | |
268 | new_bucket_info.bucket.oid.clear(); | |
269 | ||
270 | new_bucket_info.num_shards = new_num_shards; | |
271 | new_bucket_info.objv_tracker.clear(); | |
272 | ||
273 | new_bucket_info.new_bucket_instance_id.clear(); | |
274 | new_bucket_info.reshard_status = 0; | |
275 | ||
276 | int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards); | |
277 | if (ret < 0) { | |
278 | cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; | |
279 | return -ret; | |
280 | } | |
281 | ||
282 | ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs); | |
283 | if (ret < 0) { | |
284 | cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; | |
285 | return -ret; | |
286 | } | |
287 | ||
288 | return 0; | |
289 | } | |
290 | ||
291 | int RGWBucketReshard::create_new_bucket_instance(int new_num_shards, | |
292 | RGWBucketInfo& new_bucket_info) | |
293 | { | |
294 | return ::create_new_bucket_instance(store, new_num_shards, bucket_info, bucket_attrs, new_bucket_info); | |
295 | } | |
296 | ||
94b18763 FG |
297 | int RGWBucketReshard::cancel() |
298 | { | |
299 | int ret = lock_bucket(); | |
300 | if (ret < 0) { | |
301 | return ret; | |
302 | } | |
303 | ||
304 | ret = clear_resharding(); | |
305 | ||
306 | unlock_bucket(); | |
307 | return 0; | |
308 | } | |
309 | ||
31f18b77 FG |
310 | class BucketInfoReshardUpdate |
311 | { | |
312 | RGWRados *store; | |
313 | RGWBucketInfo bucket_info; | |
314 | std::map<string, bufferlist> bucket_attrs; | |
315 | ||
316 | bool in_progress{false}; | |
317 | ||
318 | int set_status(cls_rgw_reshard_status s) { | |
319 | bucket_info.reshard_status = s; | |
320 | int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs); | |
321 | if (ret < 0) { | |
322 | ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl; | |
323 | return ret; | |
324 | } | |
325 | return 0; | |
326 | } | |
327 | ||
328 | public: | |
329 | BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info, | |
330 | map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store), | |
331 | bucket_info(_bucket_info), | |
332 | bucket_attrs(_bucket_attrs) { | |
333 | bucket_info.new_bucket_instance_id = new_bucket_id; | |
334 | } | |
335 | ~BucketInfoReshardUpdate() { | |
336 | if (in_progress) { | |
337 | bucket_info.new_bucket_instance_id.clear(); | |
338 | set_status(CLS_RGW_RESHARD_NONE); | |
339 | } | |
340 | } | |
341 | ||
342 | int start() { | |
343 | int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS); | |
344 | if (ret < 0) { | |
345 | return ret; | |
346 | } | |
347 | in_progress = true; | |
348 | return 0; | |
349 | } | |
350 | ||
351 | int complete() { | |
352 | int ret = set_status(CLS_RGW_RESHARD_DONE); | |
353 | if (ret < 0) { | |
354 | return ret; | |
355 | } | |
356 | in_progress = false; | |
357 | return 0; | |
358 | } | |
359 | }; | |
360 | ||
361 | int RGWBucketReshard::do_reshard( | |
362 | int num_shards, | |
b32b8144 | 363 | RGWBucketInfo& new_bucket_info, |
31f18b77 FG |
364 | int max_entries, |
365 | bool verbose, | |
366 | ostream *out, | |
367 | Formatter *formatter) | |
368 | { | |
369 | rgw_bucket& bucket = bucket_info.bucket; | |
370 | ||
371 | int ret = 0; | |
372 | ||
373 | if (out) { | |
374 | (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl; | |
375 | (*out) << "*** these will need to be removed manually ***" << std::endl; | |
376 | (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl; | |
377 | (*out) << "bucket name: " << bucket_info.bucket.name << std::endl; | |
378 | (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl; | |
379 | (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl; | |
380 | } | |
381 | ||
382 | /* update bucket info -- in progress*/ | |
383 | list<rgw_cls_bi_entry> entries; | |
384 | ||
385 | if (max_entries < 0) { | |
386 | ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl; | |
387 | return -EINVAL; | |
388 | } | |
389 | ||
390 | BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id); | |
391 | ||
392 | ret = bucket_info_updater.start(); | |
393 | if (ret < 0) { | |
394 | ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; | |
395 | return ret; | |
396 | } | |
397 | ||
398 | int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); | |
399 | ||
400 | BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); | |
401 | ||
402 | verbose = verbose && (formatter != nullptr); | |
403 | ||
404 | if (verbose) { | |
405 | formatter->open_array_section("entries"); | |
406 | } | |
407 | ||
408 | uint64_t total_entries = 0; | |
409 | ||
410 | if (!verbose) { | |
411 | cout << "total entries:"; | |
412 | } | |
413 | ||
414 | int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); | |
415 | string marker; | |
416 | for (int i = 0; i < num_source_shards; ++i) { | |
417 | bool is_truncated = true; | |
418 | marker.clear(); | |
419 | while (is_truncated) { | |
420 | entries.clear(); | |
421 | ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); | |
422 | if (ret < 0 && ret != -ENOENT) { | |
423 | derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; | |
424 | return -ret; | |
425 | } | |
426 | ||
427 | list<rgw_cls_bi_entry>::iterator iter; | |
428 | for (iter = entries.begin(); iter != entries.end(); ++iter) { | |
429 | rgw_cls_bi_entry& entry = *iter; | |
430 | if (verbose) { | |
431 | formatter->open_object_section("entry"); | |
432 | ||
433 | encode_json("shard_id", i, formatter); | |
434 | encode_json("num_entry", total_entries, formatter); | |
435 | encode_json("entry", entry, formatter); | |
436 | } | |
437 | total_entries++; | |
438 | ||
439 | marker = entry.idx; | |
440 | ||
441 | int target_shard_id; | |
442 | cls_rgw_obj_key cls_key; | |
443 | uint8_t category; | |
444 | rgw_bucket_category_stats stats; | |
445 | bool account = entry.get_info(&cls_key, &category, &stats); | |
446 | rgw_obj_key key(cls_key); | |
447 | rgw_obj obj(new_bucket_info.bucket, key); | |
448 | int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); | |
449 | if (ret < 0) { | |
450 | lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; | |
451 | return ret; | |
452 | } | |
453 | ||
454 | int shard_index = (target_shard_id > 0 ? target_shard_id : 0); | |
455 | ||
456 | ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); | |
457 | if (ret < 0) { | |
458 | return ret; | |
459 | } | |
460 | if (verbose) { | |
461 | formatter->close_section(); | |
462 | if (out) { | |
463 | formatter->flush(*out); | |
464 | formatter->flush(*out); | |
465 | } | |
466 | } else if (out && !(total_entries % 1000)) { | |
467 | (*out) << " " << total_entries; | |
468 | } | |
469 | } | |
470 | } | |
471 | } | |
472 | if (verbose) { | |
473 | formatter->close_section(); | |
474 | if (out) { | |
475 | formatter->flush(*out); | |
476 | } | |
477 | } else if (out) { | |
478 | (*out) << " " << total_entries << std::endl; | |
479 | } | |
480 | ||
481 | ret = target_shards_mgr.finish(); | |
482 | if (ret < 0) { | |
483 | lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; | |
484 | return EIO; | |
485 | } | |
486 | ||
b32b8144 FG |
487 | ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time); |
488 | if (ret < 0) { | |
489 | lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl; | |
490 | return -ret; | |
31f18b77 FG |
491 | } |
492 | ||
493 | ret = bucket_info_updater.complete(); | |
494 | if (ret < 0) { | |
495 | ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; | |
496 | /* don't error out, reshard process succeeded */ | |
497 | } | |
498 | return 0; | |
499 | } | |
500 | ||
501 | int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status) | |
502 | { | |
503 | librados::IoCtx index_ctx; | |
504 | map<int, string> bucket_objs; | |
505 | ||
506 | int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs); | |
507 | if (r < 0) { | |
508 | return r; | |
509 | } | |
510 | ||
511 | for (auto i : bucket_objs) { | |
512 | cls_rgw_bucket_instance_entry entry; | |
513 | ||
514 | int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry); | |
515 | if (ret < 0 && ret != -ENOENT) { | |
516 | lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl; | |
517 | return ret; | |
518 | } | |
519 | ||
520 | status->push_back(entry); | |
521 | } | |
522 | ||
523 | return 0; | |
524 | } | |
525 | ||
526 | int RGWBucketReshard::execute(int num_shards, int max_op_entries, | |
527 | bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log) | |
528 | ||
529 | { | |
530 | int ret = lock_bucket(); | |
531 | if (ret < 0) { | |
532 | return ret; | |
533 | } | |
534 | ||
535 | RGWBucketInfo new_bucket_info; | |
536 | ret = create_new_bucket_instance(num_shards, new_bucket_info); | |
537 | if (ret < 0) { | |
538 | unlock_bucket(); | |
539 | return ret; | |
540 | } | |
541 | ||
542 | if (reshard_log) { | |
543 | ret = reshard_log->update(bucket_info, new_bucket_info); | |
544 | if (ret < 0) { | |
c07f9fc5 | 545 | unlock_bucket(); |
31f18b77 FG |
546 | return ret; |
547 | } | |
548 | } | |
549 | ||
550 | ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS); | |
551 | if (ret < 0) { | |
552 | unlock_bucket(); | |
553 | return ret; | |
554 | } | |
555 | ||
556 | ret = do_reshard(num_shards, | |
557 | new_bucket_info, | |
558 | max_op_entries, | |
559 | verbose, out, formatter); | |
560 | ||
561 | if (ret < 0) { | |
562 | unlock_bucket(); | |
563 | return ret; | |
564 | } | |
565 | ||
566 | ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_DONE); | |
567 | if (ret < 0) { | |
568 | unlock_bucket(); | |
569 | return ret; | |
570 | } | |
571 | ||
572 | unlock_bucket(); | |
573 | ||
574 | return 0; | |
575 | } | |
576 | ||
577 | ||
578 | RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out, | |
579 | Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name), | |
580 | verbose(_verbose), out(_out), formatter(_formatter) | |
581 | { | |
582 | num_logshards = store->ctx()->_conf->rgw_reshard_num_logs; | |
583 | } | |
584 | ||
585 | string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name) | |
586 | { | |
587 | return tenant + ":" + bucket_name; | |
588 | } | |
589 | ||
590 | #define MAX_RESHARD_LOGSHARDS_PRIME 7877 | |
591 | ||
592 | void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid) | |
593 | { | |
594 | string key = get_logshard_key(tenant, bucket_name); | |
595 | ||
596 | uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size()); | |
597 | uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); | |
598 | sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards; | |
31f18b77 | 599 | |
1adf2230 | 600 | get_logshard_oid(int(sid), oid); |
31f18b77 FG |
601 | } |
602 | ||
603 | int RGWReshard::add(cls_rgw_reshard_entry& entry) | |
604 | { | |
3efd9988 FG |
605 | if (!store->can_reshard()) { |
606 | ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; | |
607 | return 0; | |
608 | } | |
609 | ||
31f18b77 FG |
610 | string logshard_oid; |
611 | ||
612 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
613 | ||
614 | librados::ObjectWriteOperation op; | |
615 | cls_rgw_reshard_add(op, entry); | |
616 | ||
617 | int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); | |
618 | if (ret < 0) { | |
619 | lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; | |
620 | return ret; | |
621 | } | |
622 | return 0; | |
623 | } | |
624 | ||
625 | int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info) | |
626 | { | |
627 | cls_rgw_reshard_entry entry; | |
628 | entry.bucket_name = bucket_info.bucket.name; | |
629 | entry.bucket_id = bucket_info.bucket.bucket_id; | |
b32b8144 | 630 | entry.tenant = bucket_info.owner.tenant; |
31f18b77 FG |
631 | |
632 | int ret = get(entry); | |
633 | if (ret < 0) { | |
634 | return ret; | |
635 | } | |
636 | ||
637 | entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id; | |
638 | ||
639 | ret = add(entry); | |
640 | if (ret < 0) { | |
641 | ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << | |
642 | cpp_strerror(-ret) << dendl; | |
643 | } | |
644 | ||
645 | return ret; | |
646 | } | |
647 | ||
648 | ||
649 | int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated) | |
650 | { | |
651 | string logshard_oid; | |
652 | ||
653 | get_logshard_oid(logshard_num, &logshard_oid); | |
654 | ||
655 | int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated); | |
c07f9fc5 FG |
656 | |
657 | if (ret < 0) { | |
658 | if (ret == -ENOENT) { | |
659 | *is_truncated = false; | |
660 | ret = 0; | |
661 | } | |
31f18b77 | 662 | lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl; |
c07f9fc5 FG |
663 | if (ret == -EACCES) { |
664 | lderr(store->ctx()) << "access denied to pool " << store->get_zone_params().reshard_pool | |
665 | << ". Fix the pool access permissions of your client" << dendl; | |
666 | } | |
31f18b77 | 667 | } |
c07f9fc5 FG |
668 | |
669 | return ret; | |
31f18b77 FG |
670 | } |
671 | ||
672 | int RGWReshard::get(cls_rgw_reshard_entry& entry) | |
673 | { | |
674 | string logshard_oid; | |
675 | ||
676 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
677 | ||
678 | int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry); | |
679 | if (ret < 0) { | |
94b18763 FG |
680 | if (ret != -ENOENT) { |
681 | lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << | |
682 | " bucket=" << entry.bucket_name << dendl; | |
683 | } | |
31f18b77 FG |
684 | return ret; |
685 | } | |
686 | ||
687 | return 0; | |
688 | } | |
689 | ||
690 | int RGWReshard::remove(cls_rgw_reshard_entry& entry) | |
691 | { | |
692 | string logshard_oid; | |
693 | ||
694 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
695 | ||
696 | librados::ObjectWriteOperation op; | |
697 | cls_rgw_reshard_remove(op, entry); | |
698 | ||
699 | int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); | |
700 | if (ret < 0) { | |
701 | lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; | |
702 | return ret; | |
703 | } | |
704 | ||
705 | return ret; | |
706 | } | |
707 | ||
708 | int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) | |
709 | { | |
710 | int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); | |
711 | if (ret < 0) { | |
712 | lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl; | |
713 | return ret; | |
714 | } | |
715 | ||
716 | return 0; | |
717 | } | |
718 | ||
719 | const int num_retries = 10; | |
720 | const int default_reshard_sleep_duration = 5; | |
721 | ||
722 | int RGWReshardWait::do_wait() | |
723 | { | |
724 | Mutex::Locker l(lock); | |
725 | ||
726 | cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0)); | |
727 | ||
728 | if (going_down) { | |
729 | return -ECANCELED; | |
730 | } | |
731 | ||
732 | return 0; | |
733 | } | |
734 | ||
735 | int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id) | |
736 | { | |
737 | int ret = 0; | |
738 | cls_rgw_bucket_instance_entry entry; | |
739 | ||
740 | for (int i=0; i < num_retries;i++) { | |
741 | ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry); | |
742 | if (ret < 0) { | |
743 | ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :" << | |
744 | cpp_strerror(-ret)<< dendl; | |
745 | return ret; | |
746 | } | |
747 | if (!entry.resharding_in_progress()) { | |
748 | *new_bucket_id = entry.new_bucket_instance_id; | |
749 | return 0; | |
750 | } | |
751 | ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl; | |
752 | /* needed to unlock as clear resharding uses the same lock */ | |
753 | ||
754 | if (i == num_retries - 1) { | |
755 | break; | |
756 | } | |
757 | ||
758 | ret = do_wait(); | |
759 | if (ret < 0) { | |
760 | ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; | |
761 | return ret; | |
762 | } | |
763 | } | |
764 | ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; | |
765 | return -ERR_BUSY_RESHARDING; | |
766 | } | |
767 | ||
768 | int RGWReshard::process_single_logshard(int logshard_num) | |
769 | { | |
770 | string marker; | |
771 | bool truncated = true; | |
772 | ||
773 | CephContext *cct = store->ctx(); | |
774 | int max_entries = 1000; | |
775 | int max_secs = 60; | |
776 | ||
777 | rados::cls::lock::Lock l(reshard_lock_name); | |
778 | ||
779 | utime_t time(max_secs, 0); | |
780 | l.set_duration(time); | |
781 | ||
782 | char cookie_buf[COOKIE_LEN + 1]; | |
783 | gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); | |
784 | cookie_buf[COOKIE_LEN] = '\0'; | |
785 | ||
786 | l.set_cookie(cookie_buf); | |
787 | ||
788 | string logshard_oid; | |
789 | get_logshard_oid(logshard_num, &logshard_oid); | |
790 | ||
791 | int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid); | |
792 | if (ret == -EBUSY) { /* already locked by another processor */ | |
793 | ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; | |
794 | return ret; | |
795 | } | |
796 | ||
797 | utime_t lock_start_time = ceph_clock_now(); | |
798 | ||
799 | do { | |
800 | std::list<cls_rgw_reshard_entry> entries; | |
801 | ret = list(logshard_num, marker, max_entries, entries, &truncated); | |
802 | if (ret < 0) { | |
803 | ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl; | |
804 | continue; | |
805 | } | |
806 | ||
807 | for(auto& entry: entries) { | |
808 | if(entry.new_instance_id.empty()) { | |
809 | ||
810 | ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl; | |
811 | ||
812 | RGWObjectCtx obj_ctx(store); | |
813 | rgw_bucket bucket; | |
814 | RGWBucketInfo bucket_info; | |
815 | map<string, bufferlist> attrs; | |
816 | ||
817 | ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, | |
818 | &attrs); | |
819 | if (ret < 0) { | |
820 | ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; | |
821 | return -ret; | |
822 | } | |
823 | ||
824 | RGWBucketReshard br(store, bucket_info, attrs); | |
825 | ||
826 | Formatter* formatter = new JSONFormatter(false); | |
827 | auto formatter_ptr = std::unique_ptr<Formatter>(formatter); | |
828 | ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this); | |
829 | if (ret < 0) { | |
830 | ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" << | |
831 | cpp_strerror(-ret)<< dendl; | |
832 | return ret; | |
833 | } | |
834 | ||
835 | ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name<< dendl; | |
836 | ||
837 | ret = remove(entry); | |
838 | if (ret < 0) { | |
839 | ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: " | |
840 | << cpp_strerror(-ret) << dendl; | |
841 | return ret; | |
842 | } | |
843 | } | |
844 | utime_t now = ceph_clock_now(); | |
845 | ||
846 | if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */ | |
847 | l.set_renew(true); | |
848 | ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid); | |
849 | if (ret == -EBUSY) { /* already locked by another processor */ | |
850 | ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; | |
851 | return ret; | |
852 | } | |
853 | lock_start_time = now; | |
854 | } | |
855 | entry.get_key(&marker); | |
856 | } | |
857 | } while (truncated); | |
858 | ||
859 | l.unlock(&store->reshard_pool_ctx, logshard_oid); | |
860 | return 0; | |
861 | } | |
862 | ||
863 | ||
864 | void RGWReshard::get_logshard_oid(int shard_num, string *logshard) | |
865 | { | |
866 | char buf[32]; | |
867 | snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); | |
868 | ||
869 | string objname(reshard_oid_prefix); | |
870 | *logshard = objname + buf; | |
871 | } | |
872 | ||
873 | int RGWReshard::process_all_logshards() | |
874 | { | |
3efd9988 FG |
875 | if (!store->can_reshard()) { |
876 | ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; | |
877 | return 0; | |
878 | } | |
31f18b77 FG |
879 | int ret = 0; |
880 | ||
881 | for (int i = 0; i < num_logshards; i++) { | |
882 | string logshard; | |
883 | get_logshard_oid(i, &logshard); | |
884 | ||
885 | ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl; | |
886 | ||
887 | ret = process_single_logshard(i); | |
888 | if (ret <0) { | |
889 | return ret; | |
890 | } | |
891 | } | |
892 | ||
893 | return 0; | |
894 | } | |
895 | ||
896 | bool RGWReshard::going_down() | |
897 | { | |
898 | return down_flag; | |
899 | } | |
900 | ||
901 | void RGWReshard::start_processor() | |
902 | { | |
903 | worker = new ReshardWorker(store->ctx(), this); | |
904 | worker->create("rgw_reshard"); | |
905 | } | |
906 | ||
907 | void RGWReshard::stop_processor() | |
908 | { | |
909 | down_flag = true; | |
910 | if (worker) { | |
911 | worker->stop(); | |
912 | worker->join(); | |
913 | } | |
914 | delete worker; | |
224ce89b | 915 | worker = nullptr; |
31f18b77 FG |
916 | } |
917 | ||
918 | void *RGWReshard::ReshardWorker::entry() { | |
919 | utime_t last_run; | |
920 | do { | |
921 | utime_t start = ceph_clock_now(); | |
31f18b77 FG |
922 | if (reshard->process_all_logshards()) { |
923 | /* All shards have been processed properly. Next time we can start | |
924 | * from this moment. */ | |
925 | last_run = start; | |
926 | } | |
31f18b77 FG |
927 | |
928 | if (reshard->going_down()) | |
929 | break; | |
930 | ||
931 | utime_t end = ceph_clock_now(); | |
932 | end -= start; | |
933 | int secs = cct->_conf->rgw_reshard_thread_interval; | |
934 | ||
935 | if (secs <= end.sec()) | |
936 | continue; // next round | |
937 | ||
938 | secs -= end.sec(); | |
939 | ||
940 | lock.Lock(); | |
941 | cond.WaitInterval(lock, utime_t(secs, 0)); | |
942 | lock.Unlock(); | |
943 | } while (!reshard->going_down()); | |
944 | ||
945 | return NULL; | |
946 | } | |
947 | ||
948 | void RGWReshard::ReshardWorker::stop() | |
949 | { | |
950 | Mutex::Locker l(lock); | |
951 | cond.Signal(); | |
952 | } |