-// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "include/types.h"
std::vector<std::thread> producers(max_producer_count);
for (auto& p : producers) {
- p = std::move(std::thread([this, &queue_name, &producer_count] {
- test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
- --producer_count;
- }));
+ p = std::thread([this, &queue_name, &producer_count] {
+ test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+ --producer_count;
+ });
}
auto consume_count = 0U;
ASSERT_EQ(truncated, false);
}
+TEST_F(TestClsQueue, WrapAround)
+{
+ const std::string queue_name = "my-queue";
+ const auto number_of_entries = 10U;
+ const auto max_entry_size = 2000;
+ const auto min_entry_size = 1000;
+ const uint64_t queue_size = number_of_entries*max_entry_size;
+ const auto entry_overhead = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_queue_init(op, queue_name, queue_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ std::list<bufferlist> total_bl;
+
+ // fill up the queue
+ for (auto i = 0U; i < number_of_entries; ++i) {
+ const auto entry_size = rand()%(max_entry_size - min_entry_size + 1) + min_entry_size;
+ std::string entry_str(entry_size-entry_overhead, 0);
+ std::generate_n(entry_str.begin(), entry_str.size(), [](){return (char)(rand());});
+ bufferlist entry_bl;
+ entry_bl.append(entry_str);
+ std::vector<bufferlist> data{{entry_bl}};
+ // enqueue vector
+ cls_queue_enqueue(op, 0, data);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ total_bl.push_back(entry_bl);
+ }
+
+ std::string marker;
+ for (auto j = 0; j < 10; ++j) {
+ // empty half+1 of the queue
+ const auto max_elements = number_of_entries/2 + 1;
+ bool truncated;
+ std::string end_marker;
+ std::vector<cls_queue_entry> entries;
+ const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+ ASSERT_EQ(0, ret);
+ for (auto& entry : entries) {
+ ASSERT_EQ(entry.data, total_bl.front());
+ total_bl.pop_front();
+ }
+ marker = end_marker;
+ cls_queue_remove_entries(op, end_marker);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ // fill half+1 of the queue
+ for (auto i = 0U; i < number_of_entries/2 + 1; ++i) {
+ const auto entry_size = rand()%(max_entry_size - min_entry_size + 1) + min_entry_size;
+ std::string entry_str(entry_size-entry_overhead, 0);
+ std::generate_n(entry_str.begin(), entry_str.size(), [](){return (char)(rand());});
+ bufferlist entry_bl;
+ entry_bl.append(entry_str);
+ std::vector<bufferlist> data{{entry_bl}};
+ cls_queue_enqueue(op, 0, data);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ total_bl.push_back(entry_bl);
+ }
+ }
+}
+