1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rgw_notify.h"
5 #include "cls/2pc_queue/cls_2pc_queue_client.h"
6 #include "cls/lock/cls_lock_client.h"
8 #include <boost/algorithm/hex.hpp>
9 #include <boost/context/protected_fixedsize_stack.hpp>
10 #include <spawn/spawn.hpp>
11 #include "rgw_pubsub.h"
12 #include "rgw_pubsub_push.h"
13 #include "rgw_perf_counters.h"
14 #include "rgw_sal_rados.h"
15 #include "common/dout.h"
18 #define dout_subsys ceph_subsys_rgw
20 namespace rgw::notify
{
22 struct event_entry_t
{
23 rgw_pubsub_s3_event event
;
24 std::string push_endpoint
;
25 std::string push_endpoint_args
;
26 std::string arn_topic
;
28 void encode(bufferlist
& bl
) const {
29 ENCODE_START(1, 1, bl
);
31 encode(push_endpoint
, bl
);
32 encode(push_endpoint_args
, bl
);
33 encode(arn_topic
, bl
);
37 void decode(bufferlist::const_iterator
& bl
) {
40 decode(push_endpoint
, bl
);
41 decode(push_endpoint_args
, bl
);
42 decode(arn_topic
, bl
);
46 WRITE_CLASS_ENCODER(event_entry_t
)
48 using queues_t
= std::set
<std::string
>;
50 // use mmap/mprotect to allocate 128k coroutine stacks
51 auto make_stack_allocator() {
52 return boost::context::protected_fixedsize_stack
{128*1024};
55 class Manager
: public DoutPrefixProvider
{
56 const size_t max_queue_size
;
57 const uint32_t queues_update_period_ms
;
58 const uint32_t queues_update_retry_ms
;
59 const uint32_t queue_idle_sleep_us
;
60 const utime_t failover_time
;
61 CephContext
* const cct
;
62 librados::IoCtx
& rados_ioctx
;
63 static constexpr auto COOKIE_LEN
= 16;
64 const std::string lock_cookie
;
65 boost::asio::io_context io_context
;
66 boost::asio::executor_work_guard
<boost::asio::io_context::executor_type
> work_guard
;
67 const uint32_t worker_count
;
68 std::vector
<std::thread
> workers
;
69 const uint32_t stale_reservations_period_s
;
70 const uint32_t reservations_cleanup_period_s
;
72 const std::string Q_LIST_OBJECT_NAME
= "queues_list_object";
74 CephContext
*get_cct() const override
{ return cct
; }
75 unsigned get_subsys() const override
{ return dout_subsys
; }
76 std::ostream
& gen_prefix(std::ostream
& out
) const override
{ return out
<< "rgw notify: "; }
78 // read the list of queues from the queue list object
79 int read_queue_list(queues_t
& queues
, optional_yield y
) {
80 constexpr auto max_chunk
= 1024U;
81 std::string start_after
;
85 librados::ObjectReadOperation op
;
86 queues_t queues_chunk
;
87 op
.omap_get_keys2(start_after
, max_chunk
, &queues_chunk
, &more
, &rval
);
88 const auto ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, nullptr, y
);
90 // queue list object was not created - nothing to do
94 // TODO: do we need to check on rval as well as ret?
95 ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret
<< dendl
;
98 queues
.merge(queues_chunk
);
103 // set m1 to be the minimum between m1 and m2
104 static int set_min_marker(std::string
& m1
, const std::string m2
) {
105 cls_queue_marker mr1
;
106 cls_queue_marker mr2
;
107 if (mr1
.from_str(m1
.c_str()) < 0 || mr2
.from_str(m2
.c_str()) < 0) {
110 if (mr2
.gen
<= mr1
.gen
&& mr2
.offset
< mr1
.offset
) {
116 using Clock
= ceph::coarse_mono_clock
;
117 using Executor
= boost::asio::io_context::executor_type
;
118 using Timer
= boost::asio::basic_waitable_timer
<Clock
,
119 boost::asio::wait_traits
<Clock
>, Executor
>;
121 class tokens_waiter
{
122 const std::chrono::hours infinite_duration
;
123 size_t pending_tokens
;
127 tokens_waiter
& waiter
;
128 token(tokens_waiter
& _waiter
) : waiter(_waiter
) {
129 ++waiter
.pending_tokens
;
133 --waiter
.pending_tokens
;
134 if (waiter
.pending_tokens
== 0) {
135 waiter
.timer
.cancel();
142 tokens_waiter(boost::asio::io_context
& io_context
) :
143 infinite_duration(1000),
147 void async_wait(spawn::yield_context yield
) {
148 timer
.expires_from_now(infinite_duration
);
149 boost::system::error_code ec
;
150 timer
.async_wait(yield
[ec
]);
151 ceph_assert(ec
== boost::system::errc::operation_canceled
);
159 // processing of a specific entry
160 // return whether processing was successfull (true) or not (false)
161 bool process_entry(const cls_queue_entry
& entry
, spawn::yield_context yield
) {
162 event_entry_t event_entry
;
163 auto iter
= entry
.data
.cbegin();
165 decode(event_entry
, iter
);
166 } catch (buffer::error
& err
) {
167 ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err
.what() << dendl
;
171 // TODO move endpoint creation to queue level
172 const auto push_endpoint
= RGWPubSubEndpoint::create(event_entry
.push_endpoint
, event_entry
.arn_topic
,
173 RGWHTTPArgs(event_entry
.push_endpoint_args
, this),
175 ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry
.push_endpoint
<<
176 " for entry: " << entry
.marker
<< dendl
;
177 const auto ret
= push_endpoint
->send_to_completion_async(cct
, event_entry
.event
, optional_yield(io_context
, yield
));
179 ldpp_dout(this, 5) << "WARNING: push entry: " << entry
.marker
<< " to endpoint: " << event_entry
.push_endpoint
180 << " failed. error: " << ret
<< " (will retry)" << dendl
;
183 ldpp_dout(this, 20) << "INFO: push entry: " << entry
.marker
<< " to endpoint: " << event_entry
.push_endpoint
185 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
188 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
189 ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
190 << event_entry
.push_endpoint
<< " for entry: " << entry
.marker
<< ". error: " << e
.what() << " (will retry) " << dendl
;
195 // clean stale reservation from queue
196 void cleanup_queue(const std::string
& queue_name
, spawn::yield_context yield
) {
198 ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name
<< dendl
;
199 const auto now
= ceph::coarse_real_time::clock::now();
200 const auto stale_time
= now
- std::chrono::seconds(stale_reservations_period_s
);
201 librados::ObjectWriteOperation op
;
203 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
204 ClsLockType::EXCLUSIVE
,
207 cls_2pc_queue_expire_reservations(op
, stale_time
);
208 // check ownership and do reservation cleanup in one batch
209 auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
210 if (ret
== -ENOENT
) {
212 ldpp_dout(this, 5) << "INFO: queue: "
213 << queue_name
<< ". was removed. cleanup will stop" << dendl
;
217 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
221 ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
222 << ". error: " << ret
<< dendl
;
224 Timer
timer(io_context
);
225 timer
.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s
));
226 boost::system::error_code ec
;
227 timer
.async_wait(yield
[ec
]);
231 // processing of a specific queue
232 void process_queue(const std::string
& queue_name
, spawn::yield_context yield
) {
233 constexpr auto max_elements
= 1024;
234 auto is_idle
= false;
235 const std::string start_marker
;
237 // start a the cleanup coroutine for the queue
238 spawn::spawn(io_context
, [this, queue_name
](spawn::yield_context yield
) {
239 cleanup_queue(queue_name
, yield
);
240 }, make_stack_allocator());
243 // if queue was empty the last time, sleep for idle timeout
245 Timer
timer(io_context
);
246 timer
.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us
));
247 boost::system::error_code ec
;
248 timer
.async_wait(yield
[ec
]);
251 // get list of entries in the queue
253 bool truncated
= false;
254 std::string end_marker
;
255 std::vector
<cls_queue_entry
> entries
;
256 auto total_entries
= 0U;
258 librados::ObjectReadOperation op
;
262 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
263 ClsLockType::EXCLUSIVE
,
266 cls_2pc_queue_list_entries(op
, start_marker
, max_elements
, &obl
, &rval
);
267 // check ownership and list entries in one batch
268 auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, nullptr, optional_yield(io_context
, yield
));
269 if (ret
== -ENOENT
) {
271 ldpp_dout(this, 5) << "INFO: queue: "
272 << queue_name
<< ". was removed. processing will stop" << dendl
;
276 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
280 ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
281 << queue_name
<< ". error: " << ret
<< " (will retry)" << dendl
;
284 ret
= cls_2pc_queue_list_entries_result(obl
, entries
, &truncated
, end_marker
);
286 ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
287 << queue_name
<< ". error: " << ret
<< " (will retry)" << dendl
;
291 total_entries
= entries
.size();
292 if (total_entries
== 0) {
293 // nothing in the queue
296 // log when queue is not idle
297 ldpp_dout(this, 20) << "INFO: found: " << total_entries
<< " entries in: " << queue_name
<<
298 ". end marker is: " << end_marker
<< dendl
;
301 auto has_error
= false;
302 auto remove_entries
= false;
304 tokens_waiter
waiter(io_context
);
305 for (auto& entry
: entries
) {
307 // bail out on first error
310 // TODO pass entry pointer instead of by-value
311 spawn::spawn(yield
, [this, &queue_name
, entry_idx
, total_entries
, &end_marker
, &remove_entries
, &has_error
, &waiter
, entry
](spawn::yield_context yield
) {
312 const auto token
= waiter
.make_token();
313 if (process_entry(entry
, yield
)) {
314 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
315 entry
.marker
<< " (" << entry_idx
<< "/" << total_entries
<< ") from: " << queue_name
<< " ok" << dendl
;
316 remove_entries
= true;
318 if (set_min_marker(end_marker
, entry
.marker
) < 0) {
319 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker
<< ", " << entry
.marker
<< dendl
;
321 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker
<< " from: " << queue_name
<< dendl
;
324 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
325 entry
.marker
<< " (" << entry_idx
<< "/" << total_entries
<< ") from: " << queue_name
<< " failed" << dendl
;
327 }, make_stack_allocator());
331 // wait for all pending work to finish
332 waiter
.async_wait(yield
);
334 // delete all published entries from queue
335 if (remove_entries
) {
336 librados::ObjectWriteOperation op
;
338 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
339 ClsLockType::EXCLUSIVE
,
342 cls_2pc_queue_remove_entries(op
, end_marker
);
343 // check ownership and deleted entries in one batch
344 const auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
345 if (ret
== -ENOENT
) {
347 ldpp_dout(this, 5) << "INFO: queue: "
348 << queue_name
<< ". was removed. processing will stop" << dendl
;
352 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
356 ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker
<< " from queue: "
357 << queue_name
<< ". error: " << ret
<< dendl
;
359 ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker
<< " from queue: "
360 << queue_name
<< dendl
;
367 // lits of owned queues
368 using owned_queues_t
= std::unordered_set
<std::string
>;
370 // process all queues
371 // find which of the queues is owned by this daemon and process it
372 void process_queues(spawn::yield_context yield
) {
373 auto has_error
= false;
374 owned_queues_t owned_queues
;
376 // add randomness to the duration between queue checking
377 // to make sure that different daemons are not synced
378 std::random_device seed
;
379 std::mt19937
rnd_gen(seed());
380 const auto min_jitter
= 100; // ms
381 const auto max_jitter
= 500; // ms
382 std::uniform_int_distribution
<> duration_jitter(min_jitter
, max_jitter
);
385 Timer
timer(io_context
);
386 const auto duration
= (has_error
?
387 std::chrono::milliseconds(queues_update_retry_ms
) : std::chrono::milliseconds(queues_update_period_ms
)) +
388 std::chrono::milliseconds(duration_jitter(rnd_gen
));
389 timer
.expires_from_now(duration
);
390 const auto tp
= ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration
);
391 ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp
) << dendl
;
392 boost::system::error_code ec
;
393 timer
.async_wait(yield
[ec
]);
396 auto ret
= read_queue_list(queues
, optional_yield(io_context
, yield
));
402 std::vector
<std::string
> queue_gc
;
403 std::mutex queue_gc_lock
;
404 for (const auto& queue_name
: queues
) {
405 // try to lock the queue to check if it is owned by this rgw
406 // or if ownershif needs to be taken
407 librados::ObjectWriteOperation op
;
409 rados::cls::lock::lock(&op
, queue_name
+"_lock",
410 ClsLockType::EXCLUSIVE
,
413 "" /*no description*/,
415 LOCK_FLAG_MAY_RENEW
);
417 ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
419 // lock is already taken by another RGW
420 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " owned (locked) by another daemon" << dendl
;
421 // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
424 if (ret
== -ENOENT
) {
425 // queue is deleted - processing will stop the next time we try to read from the queue
426 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " should not be locked - already deleted" << dendl
;
430 // failed to lock for another reason, continue to process other queues
431 ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name
<< ". error: " << ret
<< dendl
;
435 // add queue to list of owned queues
436 if (owned_queues
.insert(queue_name
).second
) {
437 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " now owned (locked) by this daemon" << dendl
;
438 // start processing this queue
439 spawn::spawn(io_context
, [this, &queue_gc
, &queue_gc_lock
, queue_name
](spawn::yield_context yield
) {
440 process_queue(queue_name
, yield
);
441 // if queue processing ended, it measn that the queue was removed or not owned anymore
442 // mark it for deletion
443 std::lock_guard
lock_guard(queue_gc_lock
);
444 queue_gc
.push_back(queue_name
);
445 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " marked for removal" << dendl
;
446 }, make_stack_allocator());
448 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " ownership (lock) renewed" << dendl
;
451 // erase all queue that were deleted
453 std::lock_guard
lock_guard(queue_gc_lock
);
454 std::for_each(queue_gc
.begin(), queue_gc
.end(), [this, &owned_queues
](const std::string
& queue_name
) {
455 owned_queues
.erase(queue_name
);
456 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " removed" << dendl
;
468 std::for_each(workers
.begin(), workers
.end(), [] (auto& worker
) { worker
.join(); });
471 // ctor: start all threads
472 Manager(CephContext
* _cct
, uint32_t _max_queue_size
, uint32_t _queues_update_period_ms
,
473 uint32_t _queues_update_retry_ms
, uint32_t _queue_idle_sleep_us
, u_int32_t failover_time_ms
,
474 uint32_t _stale_reservations_period_s
, uint32_t _reservations_cleanup_period_s
,
475 uint32_t _worker_count
, rgw::sal::RGWRadosStore
* store
) :
476 max_queue_size(_max_queue_size
),
477 queues_update_period_ms(_queues_update_period_ms
),
478 queues_update_retry_ms(_queues_update_retry_ms
),
479 queue_idle_sleep_us(_queue_idle_sleep_us
),
480 failover_time(std::chrono::milliseconds(failover_time_ms
)),
482 rados_ioctx(store
->getRados()->get_notif_pool_ctx()),
483 lock_cookie(gen_rand_alphanumeric(cct
, COOKIE_LEN
)),
484 work_guard(boost::asio::make_work_guard(io_context
)),
485 worker_count(_worker_count
),
486 stale_reservations_period_s(_stale_reservations_period_s
),
487 reservations_cleanup_period_s(_reservations_cleanup_period_s
)
489 spawn::spawn(io_context
, [this](spawn::yield_context yield
) {
490 process_queues(yield
);
491 }, make_stack_allocator());
493 // start the worker threads to do the actual queue processing
494 const std::string WORKER_THREAD_NAME
= "notif-worker";
495 for (auto worker_id
= 0U; worker_id
< worker_count
; ++worker_id
) {
496 workers
.emplace_back([this]() { io_context
.run(); });
497 const auto rc
= ceph_pthread_setname(workers
.back().native_handle(),
498 (WORKER_THREAD_NAME
+std::to_string(worker_id
)).c_str());
499 ceph_assert(rc
== 0);
501 ldpp_dout(this, 10) << "Started notification manager with: " << worker_count
<< " workers" << dendl
;
504 int add_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
505 if (topic_name
== Q_LIST_OBJECT_NAME
) {
506 ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME
<< " (conflict with queue list object name)" << dendl
;
509 librados::ObjectWriteOperation op
;
511 cls_2pc_queue_init(op
, topic_name
, max_queue_size
);
512 auto ret
= rgw_rados_operate(this, rados_ioctx
, topic_name
, &op
, y
);
513 if (ret
== -EEXIST
) {
514 // queue already exists - nothing to do
515 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name
<< " already exists. nothing to do" << dendl
;
519 // failed to create queue
520 ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name
<< ". error: " << ret
<< dendl
;
525 std::map
<std::string
, bufferlist
> new_topic
{{topic_name
, empty_bl
}};
526 op
.omap_set(new_topic
);
527 ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, y
);
529 ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name
<< " to queue list. error: " << ret
<< dendl
;
532 ldpp_dout(this, 20) << "INFO: queue: " << topic_name
<< " added to queue list" << dendl
;
536 int remove_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
537 librados::ObjectWriteOperation op
;
539 auto ret
= rgw_rados_operate(this, rados_ioctx
, topic_name
, &op
, y
);
540 if (ret
== -ENOENT
) {
541 // queue already removed - nothing to do
542 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name
<< " already removed. nothing to do" << dendl
;
546 // failed to remove queue
547 ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name
<< ". error: " << ret
<< dendl
;
551 std::set
<std::string
> topic_to_remove
{{topic_name
}};
552 op
.omap_rm_keys(topic_to_remove
);
553 ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, y
);
555 ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name
<< " from queue list. error: " << ret
<< dendl
;
558 ldpp_dout(this, 20) << "INFO: queue: " << topic_name
<< " removed from queue list" << dendl
;
564 // note that the manager itself is not a singleton, and multiple instances may co-exist
565 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
566 static Manager
* s_manager
= nullptr;
568 constexpr size_t MAX_QUEUE_SIZE
= 128*1000*1000; // 128MB
569 constexpr uint32_t Q_LIST_UPDATE_MSEC
= 1000*30; // check queue list every 30seconds
570 constexpr uint32_t Q_LIST_RETRY_MSEC
= 1000; // retry every second if queue list update failed
571 constexpr uint32_t IDLE_TIMEOUT_USEC
= 100*1000; // idle sleep 100ms
572 constexpr uint32_t FAILOVER_TIME_MSEC
= 3*Q_LIST_UPDATE_MSEC
; // FAILOVER TIME 3x renew time
573 constexpr uint32_t WORKER_COUNT
= 1; // 1 worker thread
574 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S
= 120; // cleanup reservations that are more than 2 minutes old
575 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S
= 30; // reservation cleanup every 30 seconds
577 bool init(CephContext
* cct
, rgw::sal::RGWRadosStore
* store
, const DoutPrefixProvider
*dpp
) {
581 // TODO: take conf from CephContext
582 s_manager
= new Manager(cct
, MAX_QUEUE_SIZE
,
583 Q_LIST_UPDATE_MSEC
, Q_LIST_RETRY_MSEC
,
584 IDLE_TIMEOUT_USEC
, FAILOVER_TIME_MSEC
,
585 STALE_RESERVATIONS_PERIOD_S
, RESERVATIONS_CLEANUP_PERIOD_S
,
596 int add_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
600 return s_manager
->add_persistent_topic(topic_name
, y
);
603 int remove_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
607 return s_manager
->remove_persistent_topic(topic_name
, y
);
610 rgw::sal::RGWObject
* get_object_with_atttributes(const req_state
* s
, rgw::sal::RGWObject
* obj
) {
611 // in case of copy obj, the tags and metadata are taken from source
612 const auto src_obj
= s
->src_object
? s
->src_object
.get() : obj
;
613 if (src_obj
->get_attrs().empty()) {
614 if (!src_obj
->get_bucket()) {
615 src_obj
->set_bucket(s
->bucket
.get());
617 if (src_obj
->get_obj_attrs(s
->obj_ctx
, s
->yield
, s
) < 0) {
624 void metadata_from_attributes(const req_state
* s
, rgw::sal::RGWObject
* obj
, KeyValueMap
& metadata
) {
625 const auto src_obj
= get_object_with_atttributes(s
, obj
);
629 for (auto& attr
: src_obj
->get_attrs()) {
630 if (boost::algorithm::starts_with(attr
.first
, RGW_ATTR_META_PREFIX
)) {
631 std::string_view
key(attr
.first
);
632 key
.remove_prefix(sizeof(RGW_ATTR_PREFIX
)-1);
633 // we want to pass a null terminated version
634 // of the bufferlist, hence "to_str().c_str()"
635 metadata
.emplace(key
, attr
.second
.to_str().c_str());
640 void tags_from_attributes(const req_state
* s
, rgw::sal::RGWObject
* obj
, KeyValueMap
& tags
) {
641 const auto src_obj
= get_object_with_atttributes(s
, obj
);
645 const auto& attrs
= src_obj
->get_attrs();
646 const auto attr_iter
= attrs
.find(RGW_ATTR_TAGS
);
647 if (attr_iter
!= attrs
.end()) {
648 auto bliter
= attr_iter
->second
.cbegin();
651 ::decode(obj_tags
, bliter
);
652 } catch(buffer::error
&) {
653 // not able to decode tags
656 tags
= std::move(obj_tags
.get_tags());
660 // populate event from request
661 void populate_event_from_request(const req_state
*s
,
662 rgw::sal::RGWObject
* obj
,
664 const ceph::real_time
& mtime
,
665 const std::string
& etag
,
666 EventType event_type
,
667 rgw_pubsub_s3_event
& event
) {
668 event
.eventTime
= mtime
;
669 event
.eventName
= to_string(event_type
);
670 event
.userIdentity
= s
->user
->get_id().id
; // user that triggered the change
671 event
.x_amz_request_id
= s
->req_id
; // request ID of the original change
672 event
.x_amz_id_2
= s
->host_id
; // RGW on which the change was made
673 // configurationId is filled from notification configuration
674 event
.bucket_name
= s
->bucket_name
;
675 event
.bucket_ownerIdentity
= s
->bucket_owner
.get_id().id
;
676 event
.bucket_arn
= to_string(rgw::ARN(s
->bucket
->get_key()));
677 event
.object_key
= obj
->get_name();
678 event
.object_size
= size
;
679 event
.object_etag
= etag
;
680 event
.object_versionId
= obj
->get_instance();
681 // use timestamp as per key sequence id (hex encoded)
682 const utime_t
ts(real_clock::now());
683 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
684 std::back_inserter(event
.object_sequencer
));
685 set_event_id(event
.id
, etag
, ts
);
686 event
.bucket_id
= s
->bucket
->get_bucket_id();
688 if (s
->info
.x_meta_map
.empty()) {
689 // try to fetch the metadata from the attributes
690 metadata_from_attributes(s
, obj
, event
.x_meta_map
);
692 event
.x_meta_map
= s
->info
.x_meta_map
;
695 if (s
->tagset
.get_tags().empty()) {
696 // try to fetch the tags from the attributes
697 tags_from_attributes(s
, obj
, event
.tags
);
699 event
.tags
= s
->tagset
.get_tags();
701 // opaque data will be filled from topic configuration
704 bool notification_match(const rgw_pubsub_topic_filter
& filter
, const req_state
* s
, rgw::sal::RGWObject
* obj
,
705 EventType event
, const RGWObjTags
* req_tags
) {
706 if (!match(filter
.events
, event
)) {
709 if (!match(filter
.s3_filter
.key_filter
, obj
->get_name())) {
713 if (!filter
.s3_filter
.metadata_filter
.kv
.empty()) {
714 // metadata filter exists
715 if (!s
->info
.x_meta_map
.empty()) {
716 // metadata was cached in req_state
717 if (!match(filter
.s3_filter
.metadata_filter
, s
->info
.x_meta_map
)) {
721 // try to fetch the metadata from the attributes
722 KeyValueMap metadata
;
723 metadata_from_attributes(s
, obj
, metadata
);
724 if (!match(filter
.s3_filter
.metadata_filter
, metadata
)) {
730 if (!filter
.s3_filter
.tag_filter
.kv
.empty()) {
733 // tags in the request
734 if (!match(filter
.s3_filter
.tag_filter
, req_tags
->get_tags())) {
737 } else if (!s
->tagset
.get_tags().empty()) {
738 // tags were cached in req_state
739 if (!match(filter
.s3_filter
.tag_filter
, s
->tagset
.get_tags())) {
743 // try to fetch tags from the attributes
745 tags_from_attributes(s
, obj
, tags
);
746 if (!match(filter
.s3_filter
.tag_filter
, tags
)) {
755 int publish_reserve(const DoutPrefixProvider
*dpp
, EventType event_type
,
757 const RGWObjTags
* req_tags
)
759 RGWPubSub
ps(res
.store
, res
.s
->user
->get_id().tenant
);
760 RGWPubSub::Bucket
ps_bucket(&ps
, res
.s
->bucket
->get_key());
761 rgw_pubsub_bucket_topics bucket_topics
;
762 auto rc
= ps_bucket
.get_topics(&bucket_topics
);
764 // failed to fetch bucket topics
767 for (const auto& bucket_topic
: bucket_topics
.topics
) {
768 const rgw_pubsub_topic_filter
& topic_filter
= bucket_topic
.second
;
769 const rgw_pubsub_topic
& topic_cfg
= topic_filter
.topic
;
770 if (!notification_match(topic_filter
, res
.s
, res
.object
, event_type
, req_tags
)) {
771 // notification does not apply to req_state
774 ldpp_dout(dpp
, 20) << "INFO: notification: '" << topic_filter
.s3_id
<<
775 "' on topic: '" << topic_cfg
.dest
.arn_topic
<<
776 "' and bucket: '" << res
.s
->bucket
->get_name() <<
777 "' (unique topic: '" << topic_cfg
.name
<<
778 "') apply to event of type: '" << to_string(event_type
) << "'" << dendl
;
780 cls_2pc_reservation::id_t res_id
;
781 if (topic_cfg
.dest
.persistent
) {
782 // TODO: take default reservation size from conf
783 constexpr auto DEFAULT_RESERVATION
= 4*1024U; // 4K
784 res
.size
= DEFAULT_RESERVATION
;
785 librados::ObjectWriteOperation op
;
788 const auto& queue_name
= topic_cfg
.dest
.arn_topic
;
789 cls_2pc_queue_reserve(op
, res
.size
, 1, &obl
, &rval
);
790 auto ret
= rgw_rados_operate(dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
791 queue_name
, &op
, res
.s
->yield
, librados::OPERATION_RETURNVEC
);
793 ldpp_dout(dpp
, 1) << "ERROR: failed to reserve notification on queue: " << queue_name
794 << ". error: " << ret
<< dendl
;
795 // if no space is left in queue we ask client to slow down
796 return (ret
== -ENOSPC
) ? -ERR_RATE_LIMITED
: ret
;
798 ret
= cls_2pc_queue_reserve_result(obl
, res_id
);
800 ldpp_dout(dpp
, 1) << "ERROR: failed to parse reservation id. error: " << ret
<< dendl
;
804 res
.topics
.emplace_back(topic_filter
.s3_id
, topic_cfg
, res_id
);
809 int publish_commit(rgw::sal::RGWObject
* obj
,
811 const ceph::real_time
& mtime
,
812 const std::string
& etag
,
813 EventType event_type
,
815 const DoutPrefixProvider
*dpp
)
817 for (auto& topic
: res
.topics
) {
818 if (topic
.cfg
.dest
.persistent
&& topic
.res_id
== cls_2pc_reservation::NO_ID
) {
819 // nothing to commit or already committed/aborted
822 event_entry_t event_entry
;
823 populate_event_from_request(res
.s
, obj
, size
, mtime
, etag
, event_type
, event_entry
.event
);
824 event_entry
.event
.configurationId
= topic
.configurationId
;
825 event_entry
.event
.opaque_data
= topic
.cfg
.opaque_data
;
826 if (topic
.cfg
.dest
.persistent
) {
827 event_entry
.push_endpoint
= std::move(topic
.cfg
.dest
.push_endpoint
);
828 event_entry
.push_endpoint_args
= std::move(topic
.cfg
.dest
.push_endpoint_args
);
829 event_entry
.arn_topic
= std::move(topic
.cfg
.dest
.arn_topic
);
831 encode(event_entry
, bl
);
832 const auto& queue_name
= topic
.cfg
.dest
.arn_topic
;
833 if (bl
.length() > res
.size
) {
834 // try to make a larger reservation, fail only if this is not possible
835 ldpp_dout(dpp
, 5) << "WARNING: committed size: " << bl
.length() << " exceeded reserved size: " << res
.size
<<
836 " . trying to make a larger reservation on queue:" << queue_name
<< dendl
;
837 // first cancel the existing reservation
838 librados::ObjectWriteOperation op
;
839 cls_2pc_queue_abort(op
, topic
.res_id
);
840 auto ret
= rgw_rados_operate(dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
841 topic
.cfg
.dest
.arn_topic
, &op
,
844 ldpp_dout(dpp
, 1) << "ERROR: failed to abort reservation: " << topic
.res_id
<<
845 " when trying to make a larger reservation on queue: " << queue_name
846 << ". error: " << ret
<< dendl
;
849 // now try to make a bigger one
852 cls_2pc_queue_reserve(op
, bl
.length(), 1, &obl
, &rval
);
853 ret
= rgw_rados_operate(dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
854 queue_name
, &op
, res
.s
->yield
, librados::OPERATION_RETURNVEC
);
856 ldpp_dout(dpp
, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
857 << ". error: " << ret
<< dendl
;
858 return (ret
== -ENOSPC
) ? -ERR_RATE_LIMITED
: ret
;
860 ret
= cls_2pc_queue_reserve_result(obl
, topic
.res_id
);
862 ldpp_dout(dpp
, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret
<< dendl
;
866 std::vector
<bufferlist
> bl_data_vec
{std::move(bl
)};
867 librados::ObjectWriteOperation op
;
868 cls_2pc_queue_commit(op
, bl_data_vec
, topic
.res_id
);
869 const auto ret
= rgw_rados_operate(dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
872 topic
.res_id
= cls_2pc_reservation::NO_ID
;
874 ldpp_dout(dpp
, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
875 << ". error: " << ret
<< dendl
;
880 // TODO add endpoint LRU cache
881 const auto push_endpoint
= RGWPubSubEndpoint::create(topic
.cfg
.dest
.push_endpoint
,
882 topic
.cfg
.dest
.arn_topic
,
883 RGWHTTPArgs(topic
.cfg
.dest
.push_endpoint_args
, dpp
),
885 ldpp_dout(dpp
, 20) << "INFO: push endpoint created: " << topic
.cfg
.dest
.push_endpoint
<< dendl
;
886 const auto ret
= push_endpoint
->send_to_completion_async(res
.s
->cct
, event_entry
.event
, res
.s
->yield
);
888 ldpp_dout(dpp
, 1) << "ERROR: push to endpoint " << topic
.cfg
.dest
.push_endpoint
<< " failed. error: " << ret
<< dendl
;
889 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
892 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
893 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
894 ldpp_dout(dpp
, 1) << "ERROR: failed to create push endpoint: "
895 << topic
.cfg
.dest
.push_endpoint
<< ". error: " << e
.what() << dendl
;
896 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
904 int publish_abort(const DoutPrefixProvider
*dpp
, reservation_t
& res
) {
905 for (auto& topic
: res
.topics
) {
906 if (!topic
.cfg
.dest
.persistent
|| topic
.res_id
== cls_2pc_reservation::NO_ID
) {
907 // nothing to abort or already committed/aborted
910 const auto& queue_name
= topic
.cfg
.dest
.arn_topic
;
911 librados::ObjectWriteOperation op
;
912 cls_2pc_queue_abort(op
, topic
.res_id
);
913 const auto ret
= rgw_rados_operate(dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
917 ldpp_dout(dpp
, 1) << "ERROR: failed to abort reservation: " << topic
.res_id
<<
918 " from queue: " << queue_name
<< ". error: " << ret
<< dendl
;
921 topic
.res_id
= cls_2pc_reservation::NO_ID
;
926 reservation_t::~reservation_t() {
927 publish_abort(dpp
, *this);