data_notifier = new RGWDataNotifier(this);
data_notifier->start();
+ binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
+ binfo_cache->init(svc.cache);
+
lc = new RGWLC();
lc->initialize(cct, this);
}
ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
- binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
- binfo_cache->init(svc.cache);
-
bool need_tombstone_cache = !svc.zone->get_zone_data_notify_to_map().empty(); /* have zones syncing from us */
if (need_tombstone_cache) {
vector<rgw_raw_obj>::iterator riter;
/* rollback reference */
+ string ref_tag = tag + '\0';
for (riter = ref_objs.begin(); riter != ref_objs.end(); ++riter) {
ObjectWriteOperation op;
- cls_refcount_put(op, tag, true);
+ cls_refcount_put(op, ref_tag, true);
ref.ioctx.locator_set_key(riter->loc);
state->attrset.erase(iter->first);
}
}
+
for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
state->attrset[iter->first] = iter->second;
}
+
+ auto iter = state->attrset.find(RGW_ATTR_ID_TAG);
+ if (iter != state->attrset.end()) {
+ iter->second = state->obj_tag;
+ }
}
return 0;
return 0;
}
+static int decode_olh_info(CephContext* cct, const bufferlist& bl, RGWOLHInfo *olh)
+{
+ try {
+ auto biter = bl.cbegin();
+ decode(*olh, biter);
+ return 0;
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: failed to decode olh info" << dendl;
+ return -EIO;
+ }
+}
+
int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
uint64_t *plast_ver, rgw_zone_set* zones_trace)
map<uint64_t, vector<rgw_bucket_olh_log_entry> >::iterator iter = log.begin();
op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
- op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
+ op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GTE, last_ver);
bufferlist ver_bl;
string last_ver_s = to_string(last_ver);
op.mtime2(&mtime_ts);
bool need_to_link = false;
+ uint64_t link_epoch = 0;
cls_rgw_obj_key key;
bool delete_marker = false;
list<cls_rgw_obj_key> remove_instances;
bool need_to_remove = false;
+ // decode current epoch and instance
+ auto olh_ver = state.attrset.find(RGW_ATTR_OLH_VER);
+ if (olh_ver != state.attrset.end()) {
+ std::string str = olh_ver->second.to_str();
+ std::string err;
+ link_epoch = strict_strtoll(str.c_str(), 10, &err);
+ }
+ auto olh_info = state.attrset.find(RGW_ATTR_OLH_INFO);
+ if (olh_info != state.attrset.end()) {
+ RGWOLHInfo info;
+ int r = decode_olh_info(cct, olh_info->second, &info);
+ if (r < 0) {
+ return r;
+ }
+ info.target.key.get_index_key(&key);
+ delete_marker = info.removed;
+ }
+
for (iter = log.begin(); iter != log.end(); ++iter) {
vector<rgw_bucket_olh_log_entry>::iterator viter = iter->second.begin();
for (; viter != iter->second.end(); ++viter) {
rgw_bucket_olh_log_entry& entry = *viter;
- ldout(cct, 20) << "olh_log_entry: op=" << (int)entry.op
+ ldout(cct, 20) << "olh_log_entry: epoch=" << iter->first << " op=" << (int)entry.op
<< " key=" << entry.key.name << "[" << entry.key.instance << "] "
<< (entry.delete_marker ? "(delete)" : "") << dendl;
switch (entry.op) {
remove_instances.push_back(entry.key);
break;
case CLS_RGW_OLH_OP_LINK_OLH:
- need_to_link = true;
- need_to_remove = false;
- key = entry.key;
- delete_marker = entry.delete_marker;
+ // only overwrite a link of the same epoch if its key sorts before
+ if (link_epoch < iter->first || key.instance.empty() ||
+ key.instance > entry.key.instance) {
+ ldout(cct, 20) << "apply_olh_log applying key=" << entry.key << " epoch=" << iter->first << " delete_marker=" << entry.delete_marker
+ << " over current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl;
+ need_to_link = true;
+ need_to_remove = false;
+ key = entry.key;
+ delete_marker = entry.delete_marker;
+ } else {
+ ldout(cct, 20) << "apply_olh skipping key=" << entry.key<< " epoch=" << iter->first << " delete_marker=" << entry.delete_marker
+ << " before current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl;
+ }
break;
case CLS_RGW_OLH_OP_UNLINK_OLH:
need_to_remove = true;
int RGWRados::get_olh(const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh)
{
- map<string, bufferlist> unfiltered_attrset;
+ map<string, bufferlist> attrset;
ObjectReadOperation op;
- op.getxattrs(&unfiltered_attrset, NULL);
+ op.getxattrs(&attrset, NULL);
- bufferlist outbl;
int r = obj_operate(bucket_info, obj, &op);
-
if (r < 0) {
return r;
}
- map<string, bufferlist> attrset;
-
- rgw_filter_attrset(unfiltered_attrset, RGW_ATTR_OLH_PREFIX, &attrset);
- map<string, bufferlist>::iterator iter = attrset.find(RGW_ATTR_OLH_INFO);
+ auto iter = attrset.find(RGW_ATTR_OLH_INFO);
if (iter == attrset.end()) { /* not an olh */
return -EINVAL;
}
- try {
- auto biter = iter->second.cbegin();
- decode(*olh, biter);
- } catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: failed to decode olh info" << dendl;
- return -EIO;
- }
-
- return 0;
+ return decode_olh_info(cct, iter->second, olh);
}
void RGWRados::check_pending_olh_entries(map<string, bufferlist>& pending_entries,
}
}
- map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_OLH_INFO);
- ceph_assert(iter != state->attrset.end());
+ auto iter = state->attrset.find(RGW_ATTR_OLH_INFO);
+ if (iter == state->attrset.end()) {
+ return -EINVAL;
+ }
+
RGWOLHInfo olh;
- try {
- auto biter = iter->second.cbegin();
- decode(olh, biter);
- } catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: failed to decode olh info" << dendl;
- return -EIO;
+ int ret = decode_olh_info(cct, iter->second, &olh);
+ if (ret < 0) {
+ return ret;
}
if (olh.removed) {
map<string, bufferlist> updates;
uint32_t count = 0;
+ int pos = -1;
while (count < num_entries && !candidates.empty()) {
r = 0;
// Select the next one
- int pos = candidates.begin()->second;
+ pos = candidates.begin()->second;
const string& name = vcurrents[pos]->first;
struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
break;
}
}
- if (!m.empty())
- *last_entry = m.rbegin()->first;
+
+ if (pos >= 0)
+ *last_entry = std::move((--vcurrents[pos])->first);
return 0;
}