return boost::context::protected_fixedsize_stack{128*1024};
}
+const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
+
class Manager : public DoutPrefixProvider {
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queue_idle_sleep_us;
const utime_t failover_time;
CephContext* const cct;
- librados::IoCtx& rados_ioctx;
static constexpr auto COOKIE_LEN = 16;
const std::string lock_cookie;
boost::asio::io_context io_context;
std::vector<std::thread> workers;
const uint32_t stale_reservations_period_s;
const uint32_t reservations_cleanup_period_s;
-
- const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
+public:
+ librados::IoCtx& rados_ioctx;
+private:
CephContext *get_cct() const override { return cct; }
unsigned get_subsys() const override { return dout_subsys; }
queue_idle_sleep_us(_queue_idle_sleep_us),
failover_time(std::chrono::milliseconds(failover_time_ms)),
cct(_cct),
- rados_ioctx(store->getRados()->get_notif_pool_ctx()),
lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
work_guard(boost::asio::make_work_guard(io_context)),
worker_count(_worker_count),
stale_reservations_period_s(_stale_reservations_period_s),
- reservations_cleanup_period_s(_reservations_cleanup_period_s)
+ reservations_cleanup_period_s(_reservations_cleanup_period_s),
+ rados_ioctx(store->getRados()->get_notif_pool_ctx())
{
spawn::spawn(io_context, [this] (yield_context yield) {
process_queues(yield);
ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
return 0;
}
-
- int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
- librados::ObjectWriteOperation op;
- op.remove();
- auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
- if (ret == -ENOENT) {
- // queue already removed - nothing to do
- ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
- return 0;
- }
- if (ret < 0) {
- // failed to remove queue
- ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
- return ret;
- }
-
- std::set<std::string> topic_to_remove{{topic_name}};
- op.omap_rm_keys(topic_to_remove);
- ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
- if (ret < 0) {
- ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
- return ret;
- }
- ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
- return 0;
- }
};
// singleton manager
return s_manager->add_persistent_topic(topic_name, y);
}
+int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) {
+ librados::ObjectWriteOperation op;
+ op.remove();
+ auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y);
+ if (ret == -ENOENT) {
+ // queue already removed - nothing to do
+ ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
+ return 0;
+ }
+ if (ret < 0) {
+ // failed to remove queue
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
+ return ret;
+ }
+
+ std::set<std::string> topic_to_remove{{topic_name}};
+ op.omap_rm_keys(topic_to_remove);
+ ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
+ return 0;
+}
+
int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
- return s_manager->remove_persistent_topic(topic_name, y);
+ return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y);
}
rgw::sal::Object* get_object_with_atttributes(
return src_obj;
}
+static inline void filter_amz_meta(meta_map_t& dest, const meta_map_t& src) {
+ std::copy_if(src.cbegin(), src.cend(),
+ std::inserter(dest, dest.end()),
+ [](const auto& m) {
+ return (boost::algorithm::starts_with(m.first, RGW_AMZ_META_PREFIX));
+ });
+}
+
+
static inline void metadata_from_attributes(
reservation_t& res, rgw::sal::Object* obj) {
auto& metadata = res.x_meta_map;
event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
// configurationId is filled from notification configuration
event.bucket_name = res.bucket->get_name();
- event.bucket_ownerIdentity = res.bucket->get_owner() ? res.bucket->get_owner()->get_id().id : "";
+ event.bucket_ownerIdentity = res.bucket->get_owner() ?
+ res.bucket->get_owner()->get_id().id : res.bucket->get_info().owner.id;
const auto region = res.store->get_zone()->get_zonegroup().get_api_name();
rgw::ARN bucket_arn(res.bucket->get_key());
bucket_arn.region = region;
if (!filter.s3_filter.metadata_filter.kv.empty()) {
// metadata filter exists
if (res.s) {
- res.x_meta_map = res.s->info.x_meta_map;
+ filter_amz_meta(res.x_meta_map, res.s->info.x_meta_map);
}
metadata_from_attributes(res, obj);
if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
object(_object), src_object(_src_object), bucket(_s->bucket.get()),
object_name(_object_name),
tagset(_s->tagset),
- x_meta_map(_s->info.x_meta_map),
metadata_fetched_from_attributes(false),
user_id(_s->user->get_id().id),
user_tenant(_s->user->get_id().tenant),
req_id(_s->req_id),
yield(y)
-{}
+{
+ filter_amz_meta(x_meta_map, _s->info.x_meta_map);
+}
reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,