* read list of objects, skips objects in the ugly namespace
*/
static int get_obj_vals(cls_method_context_t hctx, const string& start, const string& filter_prefix,
- int num_entries, map<string, bufferlist> *pkeys)
+ int num_entries, map<string, bufferlist> *pkeys, bool *pmore)
{
- int ret = cls_cxx_map_get_vals(hctx, start, filter_prefix, num_entries, pkeys);
+ int ret = cls_cxx_map_get_vals(hctx, start, filter_prefix, num_entries, pkeys, pmore);
if (ret < 0)
return ret;
string new_start = c;
/* now get some more keys */
- ret = cls_cxx_map_get_vals(hctx, new_start, filter_prefix, num_entries - pkeys->size(), &new_keys);
+ ret = cls_cxx_map_get_vals(hctx, new_start, filter_prefix, num_entries - pkeys->size(), &new_keys, pmore);
if (ret < 0)
return ret;
string start_key;
encode_list_index_key(hctx, op.start_obj, &start_key);
bool done = false;
- uint32_t left_to_read = op.num_entries + 1;
+ uint32_t left_to_read = op.num_entries;
+ bool more;
do {
- rc = get_obj_vals(hctx, start_key, op.filter_prefix, left_to_read, &keys);
+ rc = get_obj_vals(hctx, start_key, op.filter_prefix, left_to_read, &keys, &more);
if (rc < 0)
return rc;
CLS_LOG(20, "entry %s[%s] is not valid\n", key.name.c_str(), key.instance.c_str());
continue;
}
-
- if (!op.list_versions && !entry.is_visible()) {
+
+ // filter out noncurrent versions, delete markers, and initial marker
+ if (!op.list_versions && (!entry.is_visible() || op.start_obj.name == key.name)) {
CLS_LOG(20, "entry %s[%s] is not visible\n", key.name.c_str(), key.instance.c_str());
continue;
}
}
} while (left_to_read > 0 && !done);
- ret.is_truncated = (left_to_read == 0) && /* we found more entries than we were requested, meaning response is truncated */
- !done;
+ ret.is_truncated = more && !done;
::encode(ret, *out);
return 0;
#define CHECK_CHUNK_SIZE 1000
bool done = false;
+ bool more;
do {
- rc = get_obj_vals(hctx, start_obj, filter_prefix, CHECK_CHUNK_SIZE, &keys);
+ rc = get_obj_vals(hctx, start_obj, filter_prefix, CHECK_CHUNK_SIZE, &keys, &more);
if (rc < 0)
return rc;
return rc;
}
- if (op.log_op) {
+ if (op.log_op && !header.syncstopped) {
rc = log_index_operation(hctx, op.key, op.op, op.tag, entry.meta.mtime,
entry.ver, info.state, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
bufferlist op_bl;
if (cancel) {
- if (op.log_op) {
+ if (op.log_op && !header.syncstopped) {
rc = log_index_operation(hctx, op.key, op.op, op.tag, entry.meta.mtime, entry.ver,
CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
break;
}
- if (op.log_op) {
+ if (op.log_op && !header.syncstopped) {
rc = log_index_operation(hctx, op.key, op.op, op.tag, entry.meta.mtime, entry.ver,
CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
remove_entry.key.name.c_str(), remove_entry.key.instance.c_str(), remove_entry.meta.category);
unaccount_entry(header, remove_entry);
- if (op.log_op) {
+ if (op.log_op && !header.syncstopped) {
+ ++header.ver; // increment index version, or we'll overwrite keys previously written
rc = log_index_operation(hctx, remove_key, CLS_RGW_OP_DEL, op.tag, remove_entry.meta.mtime,
remove_entry.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
get_list_index_key(instance_entry, &list_idx);
/* this is the current head, need to update! */
map<string, bufferlist> keys;
+ bool more;
string filter = key.name; /* list key starts with key name, filter it to avoid a case where we cross to
different namespace */
- int ret = cls_cxx_map_get_vals(hctx, list_idx, filter, 1, &keys);
+ int ret = cls_cxx_map_get_vals(hctx, list_idx, filter, 1, &keys, &more);
if (ret < 0) {
return ret;
}
return ret;
}
- if (op.log_op) {
+ if (op.log_op && !header.syncstopped) {
rgw_bucket_dir_entry& entry = obj.get_dir_entry();
rgw_bucket_entry_ver ver;
return ret;
}
- if (op.log_op) {
+ if (op.log_op && !header.syncstopped) {
rgw_bucket_entry_ver ver;
ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
return 0;
}
-int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int rgw_dir_suggest_changes(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
{
CLS_LOG(1, "rgw_dir_suggest_changes()");
return rc;
}
- timespan tag_timeout(header.tag_timeout ? header.tag_timeout : CEPH_RGW_TAG_TIMEOUT);
+ timespan tag_timeout(
+ std::chrono::seconds(
+ header.tag_timeout ? header.tag_timeout : CEPH_RGW_TAG_TIMEOUT));
bufferlist::iterator in_iter = in->begin();
ret = cls_cxx_map_remove_key(hctx, cur_change_key);
if (ret < 0)
return ret;
- if (log_op && cur_disk.exists) {
+ if (log_op && cur_disk.exists && !header.syncstopped) {
ret = log_index_operation(hctx, cur_disk.key, CLS_RGW_OP_DEL, cur_disk.tag, cur_disk.meta.mtime,
cur_disk.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL, NULL);
if (ret < 0) {
}
break;
case CEPH_RGW_UPDATE:
+ if (!cur_disk.exists) {
+ // this update would only have been sent by the rgw client
+ // if the rgw_bucket_dir_entry existed, however between that
+ // check and now the entry has diappeared, so we were likely
+ // in the midst of a delete op, and we will not recreate the
+ // entry
+ CLS_LOG(10,
+ "CEPH_RGW_UPDATE not applied because rgw_bucket_dir_entry"
+ " no longer exists\n");
+ break;
+ }
+
CLS_LOG(10, "CEPH_RGW_UPDATE name=%s instance=%s total_entries: %" PRId64 " -> %" PRId64 "\n",
cur_change.key.name.c_str(), cur_change.key.instance.c_str(), stats.num_entries, stats.num_entries + 1);
+
stats.num_entries++;
stats.total_size += cur_change.meta.accounted_size;
stats.total_size_rounded += cls_rgw_get_rounded_size(cur_change.meta.accounted_size);
ret = cls_cxx_map_set_val(hctx, cur_change_key, &cur_state_bl);
if (ret < 0)
return ret;
- if (log_op) {
+ if (log_op && !header.syncstopped) {
ret = log_index_operation(hctx, cur_change.key, CLS_RGW_OP_ADD, cur_change.tag, cur_change.meta.mtime,
cur_change.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL, NULL);
if (ret < 0) {
}
}
break;
- }
- }
-
- }
+ } // switch(op)
+ } // if (cur_disk.pending_map.empty())
+ } // while (!in_iter.end())
if (header_changed) {
return write_bucket_header(hctx, &header);
}
static int list_plain_entries(cls_method_context_t hctx, const string& name, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries)
+ list<rgw_cls_bi_entry> *entries, bool *pmore)
{
string filter = name;
string start_key = marker;
int count = 0;
map<string, bufferlist> keys;
- do {
- if (count >= (int)max) {
+ int ret = cls_cxx_map_get_vals(hctx, start_key, filter, max, &keys, pmore);
+ if (ret < 0) {
+ return ret;
+ }
+
+ map<string, bufferlist>::iterator iter;
+ for (iter = keys.begin(); iter != keys.end(); ++iter) {
+ if (iter->first >= end_key) {
+ /* past the end of plain namespace */
return count;
}
- keys.clear();
-#define BI_GET_NUM_KEYS 128
- int ret = cls_cxx_map_get_vals(hctx, start_key, filter, BI_GET_NUM_KEYS, &keys);
- if (ret < 0) {
- return ret;
- }
-
- map<string, bufferlist>::iterator iter;
- for (iter = keys.begin(); iter != keys.end(); ++iter) {
- if (iter->first >= end_key) {
- /* past the end of plain namespace */
- return count;
- }
- rgw_cls_bi_entry entry;
- entry.type = PlainIdx;
- entry.idx = iter->first;
- entry.data = iter->second;
+ rgw_cls_bi_entry entry;
+ entry.type = PlainIdx;
+ entry.idx = iter->first;
+ entry.data = iter->second;
- bufferlist::iterator biter = entry.data.begin();
+ bufferlist::iterator biter = entry.data.begin();
- rgw_bucket_dir_entry e;
- try {
- ::decode(e, biter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: %s(): failed to decode buffer", __func__);
- return -EIO;
- }
+ rgw_bucket_dir_entry e;
+ try {
+ ::decode(e, biter);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s(): failed to decode buffer", __func__);
+ return -EIO;
+ }
- CLS_LOG(20, "%s(): entry.idx=%s e.key.name=%s", __func__, escape_str(entry.idx).c_str(), escape_str(e.key.name).c_str());
+ CLS_LOG(20, "%s(): entry.idx=%s e.key.name=%s", __func__, escape_str(entry.idx).c_str(), escape_str(e.key.name).c_str());
- if (!name.empty() && e.key.name != name) {
- return count;
- }
+ if (!name.empty() && e.key.name != name) {
+ return count;
+ }
- entries->push_back(entry);
- count++;
- if (count >= (int)max) {
- return count;
- }
- start_key = entry.idx;
+ entries->push_back(entry);
+ count++;
+ if (count >= (int)max) {
+ return count;
}
- } while (!keys.empty());
+ start_key = entry.idx;
+ }
return count;
}
static int list_instance_entries(cls_method_context_t hctx, const string& name, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries)
+ list<rgw_cls_bi_entry> *entries, bool *pmore)
{
cls_rgw_obj_key key(name);
string first_instance_idx;
}
int count = 0;
map<string, bufferlist> keys;
- bool started = true;
- do {
- if (count >= (int)max) {
- return count;
- }
- keys.clear();
-#define BI_GET_NUM_KEYS 128
- int ret;
- if (started) {
- ret = cls_cxx_map_get_val(hctx, start_key, &keys[start_key]);
- if (ret == -ENOENT) {
- ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
- }
- started = false;
- } else {
- ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
- }
+ bufferlist k;
+ int ret = cls_cxx_map_get_val(hctx, start_key, &k);
+ if (ret < 0 && ret != -ENOENT) {
+ return ret;
+ }
+ bool found_first = (ret == 0);
+ if (found_first) {
+ --max;
+ }
+ if (max > 0) {
+ ret = cls_cxx_map_get_vals(hctx, start_key, string(), max, &keys, pmore);
CLS_LOG(20, "%s(): start_key=%s first_instance_idx=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), escape_str(first_instance_idx).c_str(), (int)keys.size());
if (ret < 0) {
return ret;
}
+ }
+ if (found_first) {
+ keys[start_key].claim(k);
+ }
- map<string, bufferlist>::iterator iter;
- for (iter = keys.begin(); iter != keys.end(); ++iter) {
- rgw_cls_bi_entry entry;
- entry.type = InstanceIdx;
- entry.idx = iter->first;
- entry.data = iter->second;
-
- if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) {
- return count;
- }
+ map<string, bufferlist>::iterator iter;
+ for (iter = keys.begin(); iter != keys.end(); ++iter) {
+ rgw_cls_bi_entry entry;
+ entry.type = InstanceIdx;
+ entry.idx = iter->first;
+ entry.data = iter->second;
- CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str());
+ if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) {
+ return count;
+ }
- bufferlist::iterator biter = entry.data.begin();
+ CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str());
- rgw_bucket_dir_entry e;
- try {
- ::decode(e, biter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: %s(): failed to decode buffer (size=%d)", __func__, entry.data.length());
- return -EIO;
- }
+ bufferlist::iterator biter = entry.data.begin();
- if (!name.empty() && e.key.name != name) {
- return count;
- }
+ rgw_bucket_dir_entry e;
+ try {
+ ::decode(e, biter);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s(): failed to decode buffer (size=%d)", __func__, entry.data.length());
+ return -EIO;
+ }
- entries->push_back(entry);
- count++;
- start_key = entry.idx;
+ if (!name.empty() && e.key.name != name) {
+ return count;
}
- } while (!keys.empty());
+
+ entries->push_back(entry);
+ count++;
+ start_key = entry.idx;
+ }
return count;
}
static int list_olh_entries(cls_method_context_t hctx, const string& name, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries)
+ list<rgw_cls_bi_entry> *entries, bool *pmore)
{
cls_rgw_obj_key key(name);
string first_instance_idx;
}
int count = 0;
map<string, bufferlist> keys;
- bool started = true;
- do {
- if (count >= (int)max) {
- return count;
- }
- keys.clear();
-#define BI_GET_NUM_KEYS 128
- int ret;
- if (started) {
- ret = cls_cxx_map_get_val(hctx, start_key, &keys[start_key]);
- if (ret == -ENOENT) {
- ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
- }
- started = false;
- } else {
- ret = cls_cxx_map_get_vals(hctx, start_key, string(), BI_GET_NUM_KEYS, &keys);
- }
+ int ret;
+ bufferlist k;
+ ret = cls_cxx_map_get_val(hctx, start_key, &k);
+ if (ret < 0 && ret != -ENOENT) {
+ return ret;
+ }
+ bool found_first = (ret == 0);
+ if (found_first) {
+ --max;
+ }
+ if (max > 0) {
+ ret = cls_cxx_map_get_vals(hctx, start_key, string(), max, &keys, pmore);
CLS_LOG(20, "%s(): start_key=%s first_instance_idx=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), escape_str(first_instance_idx).c_str(), (int)keys.size());
if (ret < 0) {
return ret;
}
+ }
- map<string, bufferlist>::iterator iter;
- for (iter = keys.begin(); iter != keys.end(); ++iter) {
- rgw_cls_bi_entry entry;
- entry.type = OLHIdx;
- entry.idx = iter->first;
- entry.data = iter->second;
+ if (found_first) {
+ keys[start_key].claim(k);
+ }
- if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) {
- return count;
- }
+ map<string, bufferlist>::iterator iter;
+ for (iter = keys.begin(); iter != keys.end(); ++iter) {
+ rgw_cls_bi_entry entry;
+ entry.type = OLHIdx;
+ entry.idx = iter->first;
+ entry.data = iter->second;
- CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str());
+ if (!filter.empty() && entry.idx.compare(0, filter.size(), filter) != 0) {
+ return count;
+ }
- bufferlist::iterator biter = entry.data.begin();
+ CLS_LOG(20, "%s(): entry.idx=%s", __func__, escape_str(entry.idx).c_str());
- rgw_bucket_olh_entry e;
- try {
- ::decode(e, biter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: %s(): failed to decode buffer (size=%d)", __func__, entry.data.length());
- return -EIO;
- }
+ bufferlist::iterator biter = entry.data.begin();
- if (!name.empty() && e.key.name != name) {
- return count;
- }
+ rgw_bucket_olh_entry e;
+ try {
+ ::decode(e, biter);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s(): failed to decode buffer (size=%d)", __func__, entry.data.length());
+ return -EIO;
+ }
- entries->push_back(entry);
- count++;
- start_key = entry.idx;
+ if (!name.empty() && e.key.name != name) {
+ return count;
}
- } while (!keys.empty());
+
+ entries->push_back(entry);
+ count++;
+ start_key = entry.idx;
+ }
return count;
}
string filter = op.name;
#define MAX_BI_LIST_ENTRIES 1000
- int32_t max = (op.max < MAX_BI_LIST_ENTRIES ? op.max : MAX_BI_LIST_ENTRIES) + 1; /* one extra entry for identifying truncation */
+ int32_t max = (op.max < MAX_BI_LIST_ENTRIES ? op.max : MAX_BI_LIST_ENTRIES);
string start_key = op.marker;
- int ret = list_plain_entries(hctx, op.name, op.marker, max, &op_ret.entries);
+ bool more;
+ int ret = list_plain_entries(hctx, op.name, op.marker, max, &op_ret.entries, &more);
if (ret < 0) {
CLS_LOG(0, "ERROR: %s(): list_plain_entries retured ret=%d", __func__, ret);
return ret;
CLS_LOG(20, "found %d plain entries", count);
- ret = list_instance_entries(hctx, op.name, op.marker, max - count, &op_ret.entries);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): list_instance_entries retured ret=%d", __func__, ret);
- return ret;
+ if (!more) {
+ ret = list_instance_entries(hctx, op.name, op.marker, max - count, &op_ret.entries, &more);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: %s(): list_instance_entries retured ret=%d", __func__, ret);
+ return ret;
+ }
+
+ count += ret;
}
- count += ret;
+ if (!more) {
+ ret = list_olh_entries(hctx, op.name, op.marker, max - count, &op_ret.entries, &more);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: %s(): list_olh_entries retured ret=%d", __func__, ret);
+ return ret;
+ }
- ret = list_olh_entries(hctx, op.name, op.marker, max - count, &op_ret.entries);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): list_olh_entries retured ret=%d", __func__, ret);
- return ret;
+ count += ret;
}
- count += ret;
-
- op_ret.is_truncated = (count >= max);
+ op_ret.is_truncated = (count >= max) || more;
while (count >= max) {
op_ret.entries.pop_back();
count--;
string filter;
- do {
-#define BI_NUM_KEYS 128
- int ret = cls_cxx_map_get_vals(hctx, start_key, filter, BI_NUM_KEYS, &keys);
- if (ret < 0)
- return ret;
+ int ret = cls_cxx_map_get_vals(hctx, start_key, filter, max_entries, &keys, truncated);
+ if (ret < 0)
+ return ret;
- map<string, bufferlist>::iterator iter = keys.begin();
- if (iter == keys.end())
- break;
+ map<string, bufferlist>::iterator iter = keys.begin();
+ if (iter == keys.end())
+ return 0;
- for (; iter != keys.end(); ++iter) {
- const string& key = iter->first;
- rgw_bi_log_entry e;
+ uint32_t num_keys = keys.size();
- CLS_LOG(0, "bi_log_iterate_entries key=%s bl.length=%d\n", key.c_str(), (int)iter->second.length());
+ for (; iter != keys.end(); ++iter,++i) {
+ const string& key = iter->first;
+ rgw_bi_log_entry e;
- if (key.compare(end_key) > 0)
- return 0;
+ CLS_LOG(0, "bi_log_iterate_entries key=%s bl.length=%d\n", key.c_str(), (int)iter->second.length());
- ret = bi_log_record_decode(iter->second, e);
- if (ret < 0)
- return ret;
+ if (key.compare(end_key) > 0) {
+ key_iter = key;
+ return 0;
+ }
- if (max_entries && (i >= max_entries)) {
- if (truncated)
- *truncated = true;
- key_iter = key;
- return 0;
- }
+ ret = bi_log_record_decode(iter->second, e);
+ if (ret < 0)
+ return ret;
- ret = cb(hctx, key, e, param);
- if (ret < 0)
- return ret;
- i++;
+ ret = cb(hctx, key, e, param);
+ if (ret < 0)
+ return ret;
+ if (i == num_keys - 1) {
+ key_iter = key;
}
- --iter;
- start_key = iter->first;
- } while (true);
+ }
+
return 0;
}
return 0;
}
+static int rgw_bi_log_resync(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ struct rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to read header\n");
+ return rc;
+ }
+
+ bufferlist bl;
+
+ struct rgw_bi_log_entry entry;
+
+ entry.timestamp = real_clock::now();
+ entry.op = RGWModifyOp::CLS_RGW_OP_RESYNC;
+ entry.state = RGWPendingState::CLS_RGW_STATE_COMPLETE;
+
+ string key;
+ bi_log_index_key(hctx, key, entry.id, header.ver);
+
+ ::encode(entry, bl);
+
+ if (entry.id > header.max_marker)
+ header.max_marker = entry.id;
+
+ header.syncstopped = false;
+
+ rc = cls_cxx_map_set_val(hctx, key, &bl);
+ if (rc < 0)
+ return rc;
+
+ return write_bucket_header(hctx, &header);
+}
+
+static int rgw_bi_log_stop(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ struct rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to read header\n");
+ return rc;
+ }
+
+ bufferlist bl;
+
+ struct rgw_bi_log_entry entry;
+
+ entry.timestamp = real_clock::now();
+ entry.op = RGWModifyOp::CLS_RGW_OP_SYNCSTOP;
+ entry.state = RGWPendingState::CLS_RGW_STATE_COMPLETE;
+
+ string key;
+ bi_log_index_key(hctx, key, entry.id, header.ver);
+
+ ::encode(entry, bl);
+
+ if (entry.id > header.max_marker)
+ header.max_marker = entry.id;
+ header.syncstopped = true;
+
+ rc = cls_cxx_map_set_val(hctx, key, &bl);
+ if (rc < 0)
+ return rc;
+
+ return write_bucket_header(hctx, &header);
+}
+
+
static void usage_record_prefix_by_time(uint64_t epoch, string& key)
{
char buf[32];
bool by_user = !user.empty();
uint32_t i = 0;
string user_key;
-
- if (truncated)
- *truncated = false;
+ bool truncated_status = false;
if (!by_user) {
usage_record_prefix_by_time(end, end_key);
start_key = key_iter;
}
- do {
- CLS_LOG(20, "usage_iterate_range start_key=%s", start_key.c_str());
- int ret = cls_cxx_map_get_vals(hctx, start_key, filter_prefix, NUM_KEYS, &keys);
- if (ret < 0)
- return ret;
+ CLS_LOG(20, "usage_iterate_range start_key=%s", start_key.c_str());
+ int ret = cls_cxx_map_get_vals(hctx, start_key, filter_prefix, max_entries, &keys, &truncated_status);
+ if (ret < 0)
+ return ret;
+ if (truncated) {
+ *truncated = truncated_status;
+ }
+
+ map<string, bufferlist>::iterator iter = keys.begin();
+ if (iter == keys.end())
+ return 0;
- map<string, bufferlist>::iterator iter = keys.begin();
- if (iter == keys.end())
- break;
+ uint32_t num_keys = keys.size();
- for (; iter != keys.end(); ++iter) {
- const string& key = iter->first;
- rgw_usage_log_entry e;
+ for (; iter != keys.end(); ++iter,++i) {
+ const string& key = iter->first;
+ rgw_usage_log_entry e;
- if (!by_user && key.compare(end_key) >= 0) {
- CLS_LOG(20, "usage_iterate_range reached key=%s, done", key.c_str());
- return 0;
- }
+ if (!by_user && key.compare(end_key) >= 0) {
+ CLS_LOG(20, "usage_iterate_range reached key=%s, done", key.c_str());
+ key_iter = key;
+ return 0;
+ }
- if (by_user && key.compare(0, user_key.size(), user_key) != 0) {
- CLS_LOG(20, "usage_iterate_range reached key=%s, done", key.c_str());
- return 0;
- }
+ if (by_user && key.compare(0, user_key.size(), user_key) != 0) {
+ CLS_LOG(20, "usage_iterate_range reached key=%s, done", key.c_str());
+ key_iter = key;
+ return 0;
+ }
- ret = usage_record_decode(iter->second, e);
- if (ret < 0)
- return ret;
+ ret = usage_record_decode(iter->second, e);
+ if (ret < 0)
+ return ret;
- if (e.epoch < start)
- continue;
+ if (e.epoch < start)
+ continue;
- /* keys are sorted by epoch, so once we're past end we're done */
- if (e.epoch >= end)
- return 0;
+ /* keys are sorted by epoch, so once we're past end we're done */
+ if (e.epoch >= end)
+ return 0;
- ret = cb(hctx, key, e, param);
- if (ret < 0)
- return ret;
+ ret = cb(hctx, key, e, param);
+ if (ret < 0)
+ return ret;
- i++;
- if (max_entries && (i > max_entries)) {
- CLS_LOG(20, "usage_iterate_range reached max_entries (%d), done", max_entries);
- *truncated = true;
- key_iter = key;
- return 0;
- }
+ if (i == num_keys - 1) {
+ key_iter = key;
+ return 0;
}
- --iter;
- start_key = iter->first;
- } while (true);
+ }
return 0;
}
}
string iter;
- ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, 0, NULL, usage_log_trim_cb, NULL);
+ bool more;
+#define MAX_USAGE_TRIM_ENTRIES 128
+ ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, MAX_USAGE_TRIM_ENTRIES, &more, usage_log_trim_cb, NULL);
if (ret < 0)
return ret;
+ if (!more && iter.empty())
+ return -ENODATA;
+
return 0;
}
string filter;
- do {
-#define GC_NUM_KEYS 32
- int ret = cls_cxx_map_get_vals(hctx, start_key, filter, GC_NUM_KEYS, &keys);
- if (ret < 0)
- return ret;
+ int ret = cls_cxx_map_get_vals(hctx, start_key, filter, max_entries, &keys, truncated);
+ if (ret < 0)
+ return ret;
- map<string, bufferlist>::iterator iter = keys.begin();
- if (iter == keys.end())
- break;
+ map<string, bufferlist>::iterator iter = keys.begin();
+ if (iter == keys.end())
+ return 0;
- for (; iter != keys.end(); ++iter) {
- const string& key = iter->first;
- cls_rgw_gc_obj_info e;
+ uint32_t num_keys = keys.size();
- CLS_LOG(10, "gc_iterate_entries key=%s\n", key.c_str());
+ for (; iter != keys.end(); ++iter, ++i) {
+ const string& key = iter->first;
+ cls_rgw_gc_obj_info e;
- if (!end_key.empty() && key.compare(end_key) >= 0)
- return 0;
+ CLS_LOG(10, "gc_iterate_entries key=%s\n", key.c_str());
- if (!key_in_index(key, GC_OBJ_TIME_INDEX))
- return 0;
+ if (!end_key.empty() && key.compare(end_key) >= 0)
+ return 0;
- ret = gc_record_decode(iter->second, e);
- if (ret < 0)
- return ret;
+ if (!key_in_index(key, GC_OBJ_TIME_INDEX))
+ return 0;
- if (max_entries && (i >= max_entries)) {
- if (truncated)
- *truncated = true;
- --iter;
- key_iter = iter->first;
- return 0;
- }
+ ret = gc_record_decode(iter->second, e);
+ if (ret < 0)
+ return ret;
- ret = cb(hctx, key, e, param);
- if (ret < 0)
- return ret;
- i++;
+ ret = cb(hctx, key, e, param);
+ if (ret < 0)
+ return ret;
+ if (i == num_keys - 1) {
+ key_iter = key;
}
- --iter;
- start_key = iter->first;
- } while (true);
+ }
+
return 0;
}
}
cls_rgw_gc_list_ret op_ret;
- int ret = gc_list_entries(hctx, op.marker, op.max, op.expired_only,
+#define GC_LIST_ENTRIES_DEFAULT 128
+ int ret = gc_list_entries(hctx, op.marker, (op.max ? op.max : GC_LIST_ENTRIES_DEFAULT), op.expired_only,
op_ret.entries, &op_ret.truncated, op_ret.next_marker);
if (ret < 0)
return ret;
map<string, bufferlist> vals;
string filter_prefix;
- int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, 1, &vals);
+ bool more;
+ int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, 1, &vals, &more);
if (ret < 0)
return ret;
map<string, bufferlist>::iterator it;
bufferlist::iterator iter;
map<string, bufferlist> vals;
string filter_prefix;
- int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, &vals);
+ int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, &vals, &op_ret.is_truncated);
if (ret < 0)
return ret;
map<string, bufferlist>::iterator it;
string filter_prefix;
#define MAX_RESHARD_LIST_ENTRIES 1000
/* one extra entry for identifying truncation */
- int32_t max = (op.max < MAX_RESHARD_LIST_ENTRIES ? op.max : MAX_RESHARD_LIST_ENTRIES) + 1;
- int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, max, &vals);
+ int32_t max = (op.max && (op.max < MAX_RESHARD_LIST_ENTRIES) ? op.max : MAX_RESHARD_LIST_ENTRIES);
+ int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, max, &vals, &op_ret.is_truncated);
if (ret < 0)
return ret;
map<string, bufferlist>::iterator it;
}
op_ret.entries.push_back(entry);
}
- op_ret.is_truncated = op.max && (vals.size() > op.max);
::encode(op_ret, *out);
return 0;
}
cls_method_handle_t h_rgw_bi_put_op;
cls_method_handle_t h_rgw_bi_list_op;
cls_method_handle_t h_rgw_bi_log_list_op;
+ cls_method_handle_t h_rgw_bi_log_resync_op;
+ cls_method_handle_t h_rgw_bi_log_stop_op;
cls_method_handle_t h_rgw_dir_suggest_changes;
cls_method_handle_t h_rgw_user_usage_log_add;
cls_method_handle_t h_rgw_user_usage_log_read;
cls_register_cxx_method(h_class, RGW_BI_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_trim, &h_rgw_bi_log_list_op);
cls_register_cxx_method(h_class, RGW_DIR_SUGGEST_CHANGES, CLS_METHOD_RD | CLS_METHOD_WR, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes);
+ cls_register_cxx_method(h_class, "bi_log_resync", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_resync, &h_rgw_bi_log_resync_op);
+ cls_register_cxx_method(h_class, "bi_log_stop", CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_stop, &h_rgw_bi_log_stop_op);
+
/* usage logging */
cls_register_cxx_method(h_class, RGW_USER_USAGE_LOG_ADD, CLS_METHOD_RD | CLS_METHOD_WR, rgw_user_usage_log_add, &h_rgw_user_usage_log_add);
cls_register_cxx_method(h_class, RGW_USER_USAGE_LOG_READ, CLS_METHOD_RD, rgw_user_usage_log_read, &h_rgw_user_usage_log_read);