1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rgw_notify.h"
5 #include "cls/2pc_queue/cls_2pc_queue_client.h"
6 #include "cls/lock/cls_lock_client.h"
8 #include <boost/algorithm/hex.hpp>
9 #include <boost/context/protected_fixedsize_stack.hpp>
10 #include <spawn/spawn.hpp>
11 #include "rgw_sal_rados.h"
12 #include "rgw_pubsub.h"
13 #include "rgw_pubsub_push.h"
14 #include "rgw_perf_counters.h"
15 #include "common/dout.h"
18 #define dout_subsys ceph_subsys_rgw
20 namespace rgw::notify
{
22 struct event_entry_t
{
23 rgw_pubsub_s3_event event
;
24 std::string push_endpoint
;
25 std::string push_endpoint_args
;
26 std::string arn_topic
;
28 void encode(bufferlist
& bl
) const {
29 ENCODE_START(1, 1, bl
);
31 encode(push_endpoint
, bl
);
32 encode(push_endpoint_args
, bl
);
33 encode(arn_topic
, bl
);
37 void decode(bufferlist::const_iterator
& bl
) {
40 decode(push_endpoint
, bl
);
41 decode(push_endpoint_args
, bl
);
42 decode(arn_topic
, bl
);
46 WRITE_CLASS_ENCODER(event_entry_t
)
48 using queues_t
= std::set
<std::string
>;
50 // use mmap/mprotect to allocate 128k coroutine stacks
51 auto make_stack_allocator() {
52 return boost::context::protected_fixedsize_stack
{128*1024};
55 const std::string Q_LIST_OBJECT_NAME
= "queues_list_object";
57 class Manager
: public DoutPrefixProvider
{
58 const size_t max_queue_size
;
59 const uint32_t queues_update_period_ms
;
60 const uint32_t queues_update_retry_ms
;
61 const uint32_t queue_idle_sleep_us
;
62 const utime_t failover_time
;
63 CephContext
* const cct
;
64 static constexpr auto COOKIE_LEN
= 16;
65 const std::string lock_cookie
;
66 boost::asio::io_context io_context
;
67 boost::asio::executor_work_guard
<boost::asio::io_context::executor_type
> work_guard
;
68 const uint32_t worker_count
;
69 std::vector
<std::thread
> workers
;
70 const uint32_t stale_reservations_period_s
;
71 const uint32_t reservations_cleanup_period_s
;
73 librados::IoCtx
& rados_ioctx
;
76 CephContext
*get_cct() const override
{ return cct
; }
77 unsigned get_subsys() const override
{ return dout_subsys
; }
78 std::ostream
& gen_prefix(std::ostream
& out
) const override
{ return out
<< "rgw notify: "; }
80 // read the list of queues from the queue list object
81 int read_queue_list(queues_t
& queues
, optional_yield y
) {
82 constexpr auto max_chunk
= 1024U;
83 std::string start_after
;
87 librados::ObjectReadOperation op
;
88 queues_t queues_chunk
;
89 op
.omap_get_keys2(start_after
, max_chunk
, &queues_chunk
, &more
, &rval
);
90 const auto ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, nullptr, y
);
92 // queue list object was not created - nothing to do
96 // TODO: do we need to check on rval as well as ret?
97 ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret
<< dendl
;
100 queues
.merge(queues_chunk
);
105 // set m1 to be the minimum between m1 and m2
106 static int set_min_marker(std::string
& m1
, const std::string m2
) {
107 cls_queue_marker mr1
;
108 cls_queue_marker mr2
;
109 if (mr1
.from_str(m1
.c_str()) < 0 || mr2
.from_str(m2
.c_str()) < 0) {
112 if (mr2
.gen
<= mr1
.gen
&& mr2
.offset
< mr1
.offset
) {
118 using Clock
= ceph::coarse_mono_clock
;
119 using Executor
= boost::asio::io_context::executor_type
;
120 using Timer
= boost::asio::basic_waitable_timer
<Clock
,
121 boost::asio::wait_traits
<Clock
>, Executor
>;
123 class tokens_waiter
{
124 const std::chrono::hours infinite_duration
;
125 size_t pending_tokens
;
129 tokens_waiter
& waiter
;
130 token(tokens_waiter
& _waiter
) : waiter(_waiter
) {
131 ++waiter
.pending_tokens
;
135 --waiter
.pending_tokens
;
136 if (waiter
.pending_tokens
== 0) {
137 waiter
.timer
.cancel();
144 tokens_waiter(boost::asio::io_context
& io_context
) :
145 infinite_duration(1000),
149 void async_wait(yield_context yield
) {
150 if (pending_tokens
== 0) {
153 timer
.expires_from_now(infinite_duration
);
154 boost::system::error_code ec
;
155 timer
.async_wait(yield
[ec
]);
156 ceph_assert(ec
== boost::system::errc::operation_canceled
);
164 // processing of a specific entry
165 // return whether processing was successfull (true) or not (false)
166 bool process_entry(const cls_queue_entry
& entry
, yield_context yield
) {
167 event_entry_t event_entry
;
168 auto iter
= entry
.data
.cbegin();
170 decode(event_entry
, iter
);
171 } catch (buffer::error
& err
) {
172 ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err
.what() << dendl
;
176 // TODO move endpoint creation to queue level
177 const auto push_endpoint
= RGWPubSubEndpoint::create(event_entry
.push_endpoint
, event_entry
.arn_topic
,
178 RGWHTTPArgs(event_entry
.push_endpoint_args
, this),
180 ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry
.push_endpoint
<<
181 " for entry: " << entry
.marker
<< dendl
;
182 const auto ret
= push_endpoint
->send_to_completion_async(cct
, event_entry
.event
, optional_yield(io_context
, yield
));
184 ldpp_dout(this, 5) << "WARNING: push entry: " << entry
.marker
<< " to endpoint: " << event_entry
.push_endpoint
185 << " failed. error: " << ret
<< " (will retry)" << dendl
;
188 ldpp_dout(this, 20) << "INFO: push entry: " << entry
.marker
<< " to endpoint: " << event_entry
.push_endpoint
190 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
193 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
194 ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
195 << event_entry
.push_endpoint
<< " for entry: " << entry
.marker
<< ". error: " << e
.what() << " (will retry) " << dendl
;
200 // clean stale reservation from queue
201 void cleanup_queue(const std::string
& queue_name
, yield_context yield
) {
203 ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name
<< dendl
;
204 const auto now
= ceph::coarse_real_time::clock::now();
205 const auto stale_time
= now
- std::chrono::seconds(stale_reservations_period_s
);
206 librados::ObjectWriteOperation op
;
208 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
209 ClsLockType::EXCLUSIVE
,
212 cls_2pc_queue_expire_reservations(op
, stale_time
);
213 // check ownership and do reservation cleanup in one batch
214 auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
215 if (ret
== -ENOENT
) {
217 ldpp_dout(this, 5) << "INFO: queue: "
218 << queue_name
<< ". was removed. cleanup will stop" << dendl
;
222 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
226 ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
227 << ". error: " << ret
<< dendl
;
229 Timer
timer(io_context
);
230 timer
.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s
));
231 boost::system::error_code ec
;
232 timer
.async_wait(yield
[ec
]);
236 // processing of a specific queue
237 void process_queue(const std::string
& queue_name
, yield_context yield
) {
238 constexpr auto max_elements
= 1024;
239 auto is_idle
= false;
240 const std::string start_marker
;
242 // start a the cleanup coroutine for the queue
243 spawn::spawn(io_context
, [this, queue_name
](yield_context yield
) {
244 cleanup_queue(queue_name
, yield
);
245 }, make_stack_allocator());
248 // if queue was empty the last time, sleep for idle timeout
250 Timer
timer(io_context
);
251 timer
.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us
));
252 boost::system::error_code ec
;
253 timer
.async_wait(yield
[ec
]);
256 // get list of entries in the queue
258 bool truncated
= false;
259 std::string end_marker
;
260 std::vector
<cls_queue_entry
> entries
;
261 auto total_entries
= 0U;
263 librados::ObjectReadOperation op
;
267 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
268 ClsLockType::EXCLUSIVE
,
271 cls_2pc_queue_list_entries(op
, start_marker
, max_elements
, &obl
, &rval
);
272 // check ownership and list entries in one batch
273 auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, nullptr, optional_yield(io_context
, yield
));
274 if (ret
== -ENOENT
) {
276 ldpp_dout(this, 5) << "INFO: queue: "
277 << queue_name
<< ". was removed. processing will stop" << dendl
;
281 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
285 ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
286 << queue_name
<< ". error: " << ret
<< " (will retry)" << dendl
;
289 ret
= cls_2pc_queue_list_entries_result(obl
, entries
, &truncated
, end_marker
);
291 ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
292 << queue_name
<< ". error: " << ret
<< " (will retry)" << dendl
;
296 total_entries
= entries
.size();
297 if (total_entries
== 0) {
298 // nothing in the queue
301 // log when queue is not idle
302 ldpp_dout(this, 20) << "INFO: found: " << total_entries
<< " entries in: " << queue_name
<<
303 ". end marker is: " << end_marker
<< dendl
;
306 auto has_error
= false;
307 auto remove_entries
= false;
309 tokens_waiter
waiter(io_context
);
310 for (auto& entry
: entries
) {
312 // bail out on first error
315 // TODO pass entry pointer instead of by-value
316 spawn::spawn(yield
, [this, &queue_name
, entry_idx
, total_entries
, &end_marker
, &remove_entries
, &has_error
, &waiter
, entry
](yield_context yield
) {
317 const auto token
= waiter
.make_token();
318 if (process_entry(entry
, yield
)) {
319 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
320 entry
.marker
<< " (" << entry_idx
<< "/" << total_entries
<< ") from: " << queue_name
<< " ok" << dendl
;
321 remove_entries
= true;
323 if (set_min_marker(end_marker
, entry
.marker
) < 0) {
324 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker
<< ", " << entry
.marker
<< dendl
;
326 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker
<< " from: " << queue_name
<< dendl
;
329 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
330 entry
.marker
<< " (" << entry_idx
<< "/" << total_entries
<< ") from: " << queue_name
<< " failed" << dendl
;
332 }, make_stack_allocator());
336 // wait for all pending work to finish
337 waiter
.async_wait(yield
);
339 // delete all published entries from queue
340 if (remove_entries
) {
341 librados::ObjectWriteOperation op
;
343 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
344 ClsLockType::EXCLUSIVE
,
347 cls_2pc_queue_remove_entries(op
, end_marker
);
348 // check ownership and deleted entries in one batch
349 const auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
350 if (ret
== -ENOENT
) {
352 ldpp_dout(this, 5) << "INFO: queue: "
353 << queue_name
<< ". was removed. processing will stop" << dendl
;
357 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
361 ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker
<< " from queue: "
362 << queue_name
<< ". error: " << ret
<< dendl
;
364 ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker
<< " from queue: "
365 << queue_name
<< dendl
;
371 // lits of owned queues
372 using owned_queues_t
= std::unordered_set
<std::string
>;
374 // process all queues
375 // find which of the queues is owned by this daemon and process it
376 void process_queues(yield_context yield
) {
377 auto has_error
= false;
378 owned_queues_t owned_queues
;
380 // add randomness to the duration between queue checking
381 // to make sure that different daemons are not synced
382 std::random_device seed
;
383 std::mt19937
rnd_gen(seed());
384 const auto min_jitter
= 100; // ms
385 const auto max_jitter
= 500; // ms
386 std::uniform_int_distribution
<> duration_jitter(min_jitter
, max_jitter
);
388 std::vector
<std::string
> queue_gc
;
389 std::mutex queue_gc_lock
;
391 Timer
timer(io_context
);
392 const auto duration
= (has_error
?
393 std::chrono::milliseconds(queues_update_retry_ms
) : std::chrono::milliseconds(queues_update_period_ms
)) +
394 std::chrono::milliseconds(duration_jitter(rnd_gen
));
395 timer
.expires_from_now(duration
);
396 const auto tp
= ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration
);
397 ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp
) << dendl
;
398 boost::system::error_code ec
;
399 timer
.async_wait(yield
[ec
]);
402 auto ret
= read_queue_list(queues
, optional_yield(io_context
, yield
));
408 for (const auto& queue_name
: queues
) {
409 // try to lock the queue to check if it is owned by this rgw
410 // or if ownershif needs to be taken
411 librados::ObjectWriteOperation op
;
413 rados::cls::lock::lock(&op
, queue_name
+"_lock",
414 ClsLockType::EXCLUSIVE
,
417 "" /*no description*/,
419 LOCK_FLAG_MAY_RENEW
);
421 ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
423 // lock is already taken by another RGW
424 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " owned (locked) by another daemon" << dendl
;
425 // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
428 if (ret
== -ENOENT
) {
429 // queue is deleted - processing will stop the next time we try to read from the queue
430 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " should not be locked - already deleted" << dendl
;
434 // failed to lock for another reason, continue to process other queues
435 ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name
<< ". error: " << ret
<< dendl
;
439 // add queue to list of owned queues
440 if (owned_queues
.insert(queue_name
).second
) {
441 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " now owned (locked) by this daemon" << dendl
;
442 // start processing this queue
443 spawn::spawn(io_context
, [this, &queue_gc
, &queue_gc_lock
, queue_name
](yield_context yield
) {
444 process_queue(queue_name
, yield
);
445 // if queue processing ended, it measn that the queue was removed or not owned anymore
446 // mark it for deletion
447 std::lock_guard
lock_guard(queue_gc_lock
);
448 queue_gc
.push_back(queue_name
);
449 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " marked for removal" << dendl
;
450 }, make_stack_allocator());
452 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " ownership (lock) renewed" << dendl
;
455 // erase all queue that were deleted
457 std::lock_guard
lock_guard(queue_gc_lock
);
458 std::for_each(queue_gc
.begin(), queue_gc
.end(), [this, &owned_queues
](const std::string
& queue_name
) {
459 owned_queues
.erase(queue_name
);
460 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " removed" << dendl
;
472 std::for_each(workers
.begin(), workers
.end(), [] (auto& worker
) { worker
.join(); });
475 // ctor: start all threads
476 Manager(CephContext
* _cct
, uint32_t _max_queue_size
, uint32_t _queues_update_period_ms
,
477 uint32_t _queues_update_retry_ms
, uint32_t _queue_idle_sleep_us
, u_int32_t failover_time_ms
,
478 uint32_t _stale_reservations_period_s
, uint32_t _reservations_cleanup_period_s
,
479 uint32_t _worker_count
, rgw::sal::RadosStore
* store
) :
480 max_queue_size(_max_queue_size
),
481 queues_update_period_ms(_queues_update_period_ms
),
482 queues_update_retry_ms(_queues_update_retry_ms
),
483 queue_idle_sleep_us(_queue_idle_sleep_us
),
484 failover_time(std::chrono::milliseconds(failover_time_ms
)),
486 lock_cookie(gen_rand_alphanumeric(cct
, COOKIE_LEN
)),
487 work_guard(boost::asio::make_work_guard(io_context
)),
488 worker_count(_worker_count
),
489 stale_reservations_period_s(_stale_reservations_period_s
),
490 reservations_cleanup_period_s(_reservations_cleanup_period_s
),
491 rados_ioctx(store
->getRados()->get_notif_pool_ctx())
493 spawn::spawn(io_context
, [this] (yield_context yield
) {
494 process_queues(yield
);
495 }, make_stack_allocator());
497 // start the worker threads to do the actual queue processing
498 const std::string WORKER_THREAD_NAME
= "notif-worker";
499 for (auto worker_id
= 0U; worker_id
< worker_count
; ++worker_id
) {
500 workers
.emplace_back([this]() {
503 } catch (const std::exception
& err
) {
504 ldpp_dout(this, 10) << "Notification worker failed with error: " << err
.what() << dendl
;
508 const auto rc
= ceph_pthread_setname(workers
.back().native_handle(),
509 (WORKER_THREAD_NAME
+std::to_string(worker_id
)).c_str());
510 ceph_assert(rc
== 0);
512 ldpp_dout(this, 10) << "Started notification manager with: " << worker_count
<< " workers" << dendl
;
515 int add_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
516 if (topic_name
== Q_LIST_OBJECT_NAME
) {
517 ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME
<< " (conflict with queue list object name)" << dendl
;
520 librados::ObjectWriteOperation op
;
522 cls_2pc_queue_init(op
, topic_name
, max_queue_size
);
523 auto ret
= rgw_rados_operate(this, rados_ioctx
, topic_name
, &op
, y
);
524 if (ret
== -EEXIST
) {
525 // queue already exists - nothing to do
526 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name
<< " already exists. nothing to do" << dendl
;
530 // failed to create queue
531 ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name
<< ". error: " << ret
<< dendl
;
536 std::map
<std::string
, bufferlist
> new_topic
{{topic_name
, empty_bl
}};
537 op
.omap_set(new_topic
);
538 ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, y
);
540 ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name
<< " to queue list. error: " << ret
<< dendl
;
543 ldpp_dout(this, 20) << "INFO: queue: " << topic_name
<< " added to queue list" << dendl
;
549 // note that the manager itself is not a singleton, and multiple instances may co-exist
550 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
551 static Manager
* s_manager
= nullptr;
553 constexpr size_t MAX_QUEUE_SIZE
= 128*1000*1000; // 128MB
554 constexpr uint32_t Q_LIST_UPDATE_MSEC
= 1000*30; // check queue list every 30seconds
555 constexpr uint32_t Q_LIST_RETRY_MSEC
= 1000; // retry every second if queue list update failed
556 constexpr uint32_t IDLE_TIMEOUT_USEC
= 100*1000; // idle sleep 100ms
557 constexpr uint32_t FAILOVER_TIME_MSEC
= 3*Q_LIST_UPDATE_MSEC
; // FAILOVER TIME 3x renew time
558 constexpr uint32_t WORKER_COUNT
= 1; // 1 worker thread
559 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S
= 120; // cleanup reservations that are more than 2 minutes old
560 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S
= 30; // reservation cleanup every 30 seconds
562 bool init(CephContext
* cct
, rgw::sal::RadosStore
* store
, const DoutPrefixProvider
*dpp
) {
566 // TODO: take conf from CephContext
567 s_manager
= new Manager(cct
, MAX_QUEUE_SIZE
,
568 Q_LIST_UPDATE_MSEC
, Q_LIST_RETRY_MSEC
,
569 IDLE_TIMEOUT_USEC
, FAILOVER_TIME_MSEC
,
570 STALE_RESERVATIONS_PERIOD_S
, RESERVATIONS_CLEANUP_PERIOD_S
,
581 int add_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
585 return s_manager
->add_persistent_topic(topic_name
, y
);
588 int remove_persistent_topic(const DoutPrefixProvider
* dpp
, librados::IoCtx
& rados_ioctx
, const std::string
& topic_name
, optional_yield y
) {
589 librados::ObjectWriteOperation op
;
591 auto ret
= rgw_rados_operate(dpp
, rados_ioctx
, topic_name
, &op
, y
);
592 if (ret
== -ENOENT
) {
593 // queue already removed - nothing to do
594 ldpp_dout(dpp
, 20) << "INFO: queue for topic: " << topic_name
<< " already removed. nothing to do" << dendl
;
598 // failed to remove queue
599 ldpp_dout(dpp
, 1) << "ERROR: failed to remove queue for topic: " << topic_name
<< ". error: " << ret
<< dendl
;
603 std::set
<std::string
> topic_to_remove
{{topic_name
}};
604 op
.omap_rm_keys(topic_to_remove
);
605 ret
= rgw_rados_operate(dpp
, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, y
);
607 ldpp_dout(dpp
, 1) << "ERROR: failed to remove queue: " << topic_name
<< " from queue list. error: " << ret
<< dendl
;
610 ldpp_dout(dpp
, 20) << "INFO: queue: " << topic_name
<< " removed from queue list" << dendl
;
614 int remove_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
618 return remove_persistent_topic(s_manager
, s_manager
->rados_ioctx
, topic_name
, y
);
621 rgw::sal::Object
* get_object_with_atttributes(
622 const reservation_t
& res
, rgw::sal::Object
* obj
) {
623 // in case of copy obj, the tags and metadata are taken from source
624 const auto src_obj
= res
.src_object
? res
.src_object
: obj
;
625 if (src_obj
->get_attrs().empty()) {
626 if (!src_obj
->get_bucket()) {
627 src_obj
->set_bucket(res
.bucket
);
629 const auto ret
= src_obj
->get_obj_attrs(res
.yield
, res
.dpp
);
631 ldpp_dout(res
.dpp
, 20) << "failed to get attributes from object: " <<
632 src_obj
->get_key() << ". ret = " << ret
<< dendl
;
639 static inline void filter_amz_meta(meta_map_t
& dest
, const meta_map_t
& src
) {
640 std::copy_if(src
.cbegin(), src
.cend(),
641 std::inserter(dest
, dest
.end()),
643 return (boost::algorithm::starts_with(m
.first
, RGW_AMZ_META_PREFIX
));
648 static inline void metadata_from_attributes(
649 reservation_t
& res
, rgw::sal::Object
* obj
) {
650 auto& metadata
= res
.x_meta_map
;
651 const auto src_obj
= get_object_with_atttributes(res
, obj
);
655 res
.metadata_fetched_from_attributes
= true;
656 for (auto& attr
: src_obj
->get_attrs()) {
657 if (boost::algorithm::starts_with(attr
.first
, RGW_ATTR_META_PREFIX
)) {
658 std::string_view
key(attr
.first
);
659 key
.remove_prefix(sizeof(RGW_ATTR_PREFIX
)-1);
660 // we want to pass a null terminated version
661 // of the bufferlist, hence "to_str().c_str()"
662 metadata
.emplace(key
, attr
.second
.to_str().c_str());
667 static inline void tags_from_attributes(
668 const reservation_t
& res
, rgw::sal::Object
* obj
, KeyMultiValueMap
& tags
) {
669 const auto src_obj
= get_object_with_atttributes(res
, obj
);
673 const auto& attrs
= src_obj
->get_attrs();
674 const auto attr_iter
= attrs
.find(RGW_ATTR_TAGS
);
675 if (attr_iter
!= attrs
.end()) {
676 auto bliter
= attr_iter
->second
.cbegin();
679 ::decode(obj_tags
, bliter
);
680 } catch(buffer::error
&) {
681 // not able to decode tags
684 tags
= std::move(obj_tags
.get_tags());
688 // populate event from request
689 static inline void populate_event(reservation_t
& res
,
690 rgw::sal::Object
* obj
,
692 const ceph::real_time
& mtime
,
693 const std::string
& etag
,
694 const std::string
& version
,
695 EventType event_type
,
696 rgw_pubsub_s3_event
& event
) {
697 event
.eventTime
= mtime
;
698 event
.eventName
= to_event_string(event_type
);
699 event
.userIdentity
= res
.user_id
; // user that triggered the change
700 event
.x_amz_request_id
= res
.req_id
; // request ID of the original change
701 event
.x_amz_id_2
= res
.store
->getRados()->host_id
; // RGW on which the change was made
702 // configurationId is filled from notification configuration
703 event
.bucket_name
= res
.bucket
->get_name();
704 event
.bucket_ownerIdentity
= res
.bucket
->get_owner() ?
705 res
.bucket
->get_owner()->get_id().id
: res
.bucket
->get_info().owner
.id
;
706 const auto region
= res
.store
->get_zone()->get_zonegroup().get_api_name();
707 rgw::ARN
bucket_arn(res
.bucket
->get_key());
708 bucket_arn
.region
= region
;
709 event
.bucket_arn
= to_string(bucket_arn
);
710 event
.object_key
= res
.object_name
? *res
.object_name
: obj
->get_name();
711 event
.object_size
= size
;
712 event
.object_etag
= etag
;
713 event
.object_versionId
= version
;
714 event
.awsRegion
= region
;
715 // use timestamp as per key sequence id (hex encoded)
716 const utime_t
ts(real_clock::now());
717 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
718 std::back_inserter(event
.object_sequencer
));
719 set_event_id(event
.id
, etag
, ts
);
720 event
.bucket_id
= res
.bucket
->get_bucket_id();
722 if (!res
.metadata_fetched_from_attributes
) {
723 // either no metadata exist or no metadata filter was used
724 metadata_from_attributes(res
, obj
);
726 event
.x_meta_map
= res
.x_meta_map
;
729 (*res
.tagset
).get_tags().empty()) {
730 // try to fetch the tags from the attributes
731 tags_from_attributes(res
, obj
, event
.tags
);
733 event
.tags
= (*res
.tagset
).get_tags();
735 // opaque data will be filled from topic configuration
738 static inline bool notification_match(reservation_t
& res
,
739 const rgw_pubsub_topic_filter
& filter
,
741 const RGWObjTags
* req_tags
) {
742 if (!match(filter
.events
, event
)) {
745 const auto obj
= res
.object
;
746 if (!match(filter
.s3_filter
.key_filter
,
747 res
.object_name
? *res
.object_name
: obj
->get_name())) {
751 if (!filter
.s3_filter
.metadata_filter
.kv
.empty()) {
752 // metadata filter exists
754 filter_amz_meta(res
.x_meta_map
, res
.s
->info
.x_meta_map
);
756 metadata_from_attributes(res
, obj
);
757 if (!match(filter
.s3_filter
.metadata_filter
, res
.x_meta_map
)) {
762 if (!filter
.s3_filter
.tag_filter
.kv
.empty()) {
765 // tags in the request
766 if (!match(filter
.s3_filter
.tag_filter
, req_tags
->get_tags())) {
769 } else if (res
.tagset
&& !(*res
.tagset
).get_tags().empty()) {
770 // tags were cached in req_state
771 if (!match(filter
.s3_filter
.tag_filter
, (*res
.tagset
).get_tags())) {
775 // try to fetch tags from the attributes
776 KeyMultiValueMap tags
;
777 tags_from_attributes(res
, obj
, tags
);
778 if (!match(filter
.s3_filter
.tag_filter
, tags
)) {
787 int publish_reserve(const DoutPrefixProvider
* dpp
,
788 EventType event_type
,
790 const RGWObjTags
* req_tags
)
792 const RGWPubSub
ps(res
.store
, res
.user_tenant
);
793 const RGWPubSub::Bucket
ps_bucket(ps
, res
.bucket
);
794 rgw_pubsub_bucket_topics bucket_topics
;
795 auto rc
= ps_bucket
.get_topics(res
.dpp
, bucket_topics
, res
.yield
);
797 // failed to fetch bucket topics
800 for (const auto& bucket_topic
: bucket_topics
.topics
) {
801 const rgw_pubsub_topic_filter
& topic_filter
= bucket_topic
.second
;
802 const rgw_pubsub_topic
& topic_cfg
= topic_filter
.topic
;
803 if (!notification_match(res
, topic_filter
, event_type
, req_tags
)) {
804 // notification does not apply to req_state
807 ldpp_dout(res
.dpp
, 20) << "INFO: notification: '" << topic_filter
.s3_id
<<
808 "' on topic: '" << topic_cfg
.dest
.arn_topic
<<
809 "' and bucket: '" << res
.bucket
->get_name() <<
810 "' (unique topic: '" << topic_cfg
.name
<<
811 "') apply to event of type: '" << to_string(event_type
) << "'" << dendl
;
813 cls_2pc_reservation::id_t res_id
;
814 if (topic_cfg
.dest
.persistent
) {
815 // TODO: take default reservation size from conf
816 constexpr auto DEFAULT_RESERVATION
= 4*1024U; // 4K
817 res
.size
= DEFAULT_RESERVATION
;
818 librados::ObjectWriteOperation op
;
821 const auto& queue_name
= topic_cfg
.dest
.arn_topic
;
822 cls_2pc_queue_reserve(op
, res
.size
, 1, &obl
, &rval
);
823 auto ret
= rgw_rados_operate(
824 res
.dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
825 queue_name
, &op
, res
.yield
, librados::OPERATION_RETURNVEC
);
827 ldpp_dout(res
.dpp
, 1) <<
828 "ERROR: failed to reserve notification on queue: "
829 << queue_name
<< ". error: " << ret
<< dendl
;
830 // if no space is left in queue we ask client to slow down
831 return (ret
== -ENOSPC
) ? -ERR_RATE_LIMITED
: ret
;
833 ret
= cls_2pc_queue_reserve_result(obl
, res_id
);
835 ldpp_dout(res
.dpp
, 1) << "ERROR: failed to parse reservation id. error: " << ret
<< dendl
;
839 res
.topics
.emplace_back(topic_filter
.s3_id
, topic_cfg
, res_id
);
844 int publish_commit(rgw::sal::Object
* obj
,
846 const ceph::real_time
& mtime
,
847 const std::string
& etag
,
848 const std::string
& version
,
849 EventType event_type
,
851 const DoutPrefixProvider
* dpp
)
853 for (auto& topic
: res
.topics
) {
854 if (topic
.cfg
.dest
.persistent
&&
855 topic
.res_id
== cls_2pc_reservation::NO_ID
) {
856 // nothing to commit or already committed/aborted
859 event_entry_t event_entry
;
860 populate_event(res
, obj
, size
, mtime
, etag
, version
, event_type
, event_entry
.event
);
861 event_entry
.event
.configurationId
= topic
.configurationId
;
862 event_entry
.event
.opaque_data
= topic
.cfg
.opaque_data
;
863 if (topic
.cfg
.dest
.persistent
) {
864 event_entry
.push_endpoint
= std::move(topic
.cfg
.dest
.push_endpoint
);
865 event_entry
.push_endpoint_args
=
866 std::move(topic
.cfg
.dest
.push_endpoint_args
);
867 event_entry
.arn_topic
= topic
.cfg
.dest
.arn_topic
;
869 encode(event_entry
, bl
);
870 const auto& queue_name
= topic
.cfg
.dest
.arn_topic
;
871 if (bl
.length() > res
.size
) {
872 // try to make a larger reservation, fail only if this is not possible
873 ldpp_dout(dpp
, 5) << "WARNING: committed size: " << bl
.length()
874 << " exceeded reserved size: " << res
.size
876 " . trying to make a larger reservation on queue:" << queue_name
878 // first cancel the existing reservation
879 librados::ObjectWriteOperation op
;
880 cls_2pc_queue_abort(op
, topic
.res_id
);
881 auto ret
= rgw_rados_operate(
882 dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
883 topic
.cfg
.dest
.arn_topic
, &op
,
886 ldpp_dout(dpp
, 1) << "ERROR: failed to abort reservation: "
888 " when trying to make a larger reservation on queue: " << queue_name
889 << ". error: " << ret
<< dendl
;
892 // now try to make a bigger one
895 cls_2pc_queue_reserve(op
, bl
.length(), 1, &obl
, &rval
);
896 ret
= rgw_rados_operate(
897 dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
898 queue_name
, &op
, res
.yield
, librados::OPERATION_RETURNVEC
);
900 ldpp_dout(dpp
, 1) << "ERROR: failed to reserve extra space on queue: "
902 << ". error: " << ret
<< dendl
;
903 return (ret
== -ENOSPC
) ? -ERR_RATE_LIMITED
: ret
;
905 ret
= cls_2pc_queue_reserve_result(obl
, topic
.res_id
);
907 ldpp_dout(dpp
, 1) << "ERROR: failed to parse reservation id for "
908 "extra space. error: " << ret
<< dendl
;
912 std::vector
<buffer::list
> bl_data_vec
{std::move(bl
)};
913 librados::ObjectWriteOperation op
;
914 cls_2pc_queue_commit(op
, bl_data_vec
, topic
.res_id
);
915 const auto ret
= rgw_rados_operate(
916 dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
917 queue_name
, &op
, res
.yield
);
918 topic
.res_id
= cls_2pc_reservation::NO_ID
;
920 ldpp_dout(dpp
, 1) << "ERROR: failed to commit reservation to queue: "
921 << queue_name
<< ". error: " << ret
927 // TODO add endpoint LRU cache
928 const auto push_endpoint
= RGWPubSubEndpoint::create(
929 topic
.cfg
.dest
.push_endpoint
,
930 topic
.cfg
.dest
.arn_topic
,
931 RGWHTTPArgs(topic
.cfg
.dest
.push_endpoint_args
, dpp
),
933 ldpp_dout(res
.dpp
, 20) << "INFO: push endpoint created: "
934 << topic
.cfg
.dest
.push_endpoint
<< dendl
;
935 const auto ret
= push_endpoint
->send_to_completion_async(
936 dpp
->get_cct(), event_entry
.event
, res
.yield
);
938 ldpp_dout(dpp
, 1) << "ERROR: push to endpoint "
939 << topic
.cfg
.dest
.push_endpoint
940 << " failed. error: " << ret
<< dendl
;
941 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
944 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
945 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
946 ldpp_dout(dpp
, 1) << "ERROR: failed to create push endpoint: "
947 << topic
.cfg
.dest
.push_endpoint
<< ". error: " << e
.what() << dendl
;
948 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
956 int publish_abort(reservation_t
& res
) {
957 for (auto& topic
: res
.topics
) {
958 if (!topic
.cfg
.dest
.persistent
||
959 topic
.res_id
== cls_2pc_reservation::NO_ID
) {
960 // nothing to abort or already committed/aborted
963 const auto& queue_name
= topic
.cfg
.dest
.arn_topic
;
964 librados::ObjectWriteOperation op
;
965 cls_2pc_queue_abort(op
, topic
.res_id
);
966 const auto ret
= rgw_rados_operate(
967 res
.dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
968 queue_name
, &op
, res
.yield
);
970 ldpp_dout(res
.dpp
, 1) << "ERROR: failed to abort reservation: "
972 " from queue: " << queue_name
<< ". error: " << ret
<< dendl
;
975 topic
.res_id
= cls_2pc_reservation::NO_ID
;
980 reservation_t::reservation_t(const DoutPrefixProvider
* _dpp
,
981 rgw::sal::RadosStore
* _store
,
983 rgw::sal::Object
* _object
,
984 rgw::sal::Object
* _src_object
,
985 const std::string
* _object_name
,
987 dpp(_s
), store(_store
), s(_s
), size(0) /* XXX */,
988 object(_object
), src_object(_src_object
), bucket(_s
->bucket
.get()),
989 object_name(_object_name
),
991 metadata_fetched_from_attributes(false),
992 user_id(_s
->user
->get_id().id
),
993 user_tenant(_s
->user
->get_id().tenant
),
997 filter_amz_meta(x_meta_map
, _s
->info
.x_meta_map
);
1000 reservation_t::reservation_t(const DoutPrefixProvider
* _dpp
,
1001 rgw::sal::RadosStore
* _store
,
1002 rgw::sal::Object
* _object
,
1003 rgw::sal::Object
* _src_object
,
1004 rgw::sal::Bucket
* _bucket
,
1005 const std::string
& _user_id
,
1006 const std::string
& _user_tenant
,
1007 const std::string
& _req_id
,
1009 dpp(_dpp
), store(_store
), s(nullptr), size(0) /* XXX */,
1010 object(_object
), src_object(_src_object
), bucket(_bucket
),
1011 object_name(nullptr),
1012 metadata_fetched_from_attributes(false),
1014 user_tenant(_user_tenant
),
1019 reservation_t::~reservation_t() {
1020 publish_abort(*this);
1023 } // namespace rgw::notify