class BucketInfoReshardUpdate
{
RGWRados *store;
- RGWBucketInfo bucket_info;
+ RGWBucketInfo& bucket_info;
std::map<string, bufferlist> bucket_attrs;
bool in_progress{false};
BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
- verbose = verbose && (formatter != nullptr);
+ bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
- if (verbose) {
+ if (verbose_json_out) {
formatter->open_array_section("entries");
}
uint64_t total_entries = 0;
- if (!verbose) {
- cout << "total entries:";
+ if (!verbose_json_out && out) {
+ (*out) << "total entries:";
}
const int num_source_shards =
for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
rgw_cls_bi_entry& entry = *iter;
- if (verbose) {
+ if (verbose_json_out) {
formatter->open_object_section("entry");
encode_json("shard_id", i, formatter);
bool account = entry.get_info(&cls_key, &category, &stats);
rgw_obj_key key(cls_key);
rgw_obj obj(new_bucket_info.bucket, key);
+ RGWMPObj mp;
+ if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+ // place the multipart .meta object on the same shard as its head object
+ obj.index_hash_source = mp.get_key();
+ }
int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
if (ret < 0) {
lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
}
}
- if (verbose) {
+ if (verbose_json_out) {
formatter->close_section();
- if (out) {
- formatter->flush(*out);
- }
+ formatter->flush(*out);
} else if (out && !(total_entries % 1000)) {
(*out) << " " << total_entries;
}
}
}
- if (verbose) {
+ if (verbose_json_out) {
formatter->close_section();
- if (out) {
- formatter->flush(*out);
- }
+ formatter->flush(*out);
} else if (out) {
(*out) << " " << total_entries << std::endl;
}
RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
int ret = logshard_lock.lock();
- if (ret == -EBUSY) { /* already locked by another processor */
+ if (ret < 0) {
ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
- logshard_oid << dendl;
+ logshard_oid << ", ret = " << ret <<dendl;
return ret;
}
-
+
do {
std::list<cls_rgw_reshard_entry> entries;
ret = list(logshard_num, marker, max_entries, entries, &truncated);
ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
bucket_info, nullptr, &attrs);
if (ret < 0) {
- ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " <<
- cpp_strerror(-ret) << dendl;
- return -ret;
+ ldout(cct, 0) << __func__ <<
+ ": Error in get_bucket_info for bucket " << entry.bucket_name <<
+ ": " << cpp_strerror(-ret) << dendl;
+ if (ret != -ENOENT) {
+ // any error other than ENOENT will abort
+ return ret;
+ }
+
+ // we've encountered a reshard queue entry for an apparently
+ // non-existent bucket; let's try to recover by cleaning up
+ ldout(cct, 0) << __func__ <<
+ ": removing reshard queue entry for non-existent bucket " <<
+ entry.bucket_name << dendl;
+
+ ret = remove(entry);
+ if (ret < 0) {
+ ldout(cct, 0) << __func__ <<
+ ": Error removing non-existent bucket " <<
+ entry.bucket_name << " from resharding queue: " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ // we cleaned up, move on to the next entry
+ goto finished_entry;
}
RGWBucketReshard br(store, bucket_info, attrs, nullptr);
-
- Formatter* formatter = new JSONFormatter(false);
- auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
- ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
- formatter, this);
+ ret = br.execute(entry.new_num_shards, max_entries, false, nullptr,
+ nullptr, this);
if (ret < 0) {
- ldout (store->ctx(), 0) << __func__ <<
- "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
+ ldout(store->ctx(), 0) << __func__ <<
+ ": Error during resharding bucket " << entry.bucket_name << ":" <<
cpp_strerror(-ret)<< dendl;
return ret;
}
- ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name <<
+ ldout(store->ctx(), 20) << __func__ <<
+ " removing reshard queue entry for bucket " << entry.bucket_name <<
dendl;
ret = remove(entry);
if (ret < 0) {
- ldout(cct, 0)<< __func__ << ":Error removing bucket " <<
- entry.bucket_name << " for resharding queue: " <<
+ ldout(cct, 0) << __func__ << ": Error removing bucket " <<
+ entry.bucket_name << " from resharding queue: " <<
cpp_strerror(-ret) << dendl;
return ret;
}
- }
+ } // if new instance id is empty
+
+ finished_entry:
Clock::time_point now = Clock::now();
if (logshard_lock.should_renew(now)) {
}
entry.get_key(&marker);
- }
+ } // entry for loop
} while (truncated);
logshard_lock.unlock();