1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rgw_notify.h"
5 #include "cls/2pc_queue/cls_2pc_queue_client.h"
6 #include "cls/lock/cls_lock_client.h"
8 #include <boost/algorithm/hex.hpp>
9 #include <boost/context/protected_fixedsize_stack.hpp>
10 #include <spawn/spawn.hpp>
11 #include "rgw_sal_rados.h"
12 #include "rgw_pubsub.h"
13 #include "rgw_pubsub_push.h"
14 #include "rgw_perf_counters.h"
15 #include "common/dout.h"
18 #define dout_subsys ceph_subsys_rgw
20 namespace rgw::notify
{
22 struct event_entry_t
{
23 rgw_pubsub_s3_event event
;
24 std::string push_endpoint
;
25 std::string push_endpoint_args
;
26 std::string arn_topic
;
28 void encode(bufferlist
& bl
) const {
29 ENCODE_START(1, 1, bl
);
31 encode(push_endpoint
, bl
);
32 encode(push_endpoint_args
, bl
);
33 encode(arn_topic
, bl
);
37 void decode(bufferlist::const_iterator
& bl
) {
40 decode(push_endpoint
, bl
);
41 decode(push_endpoint_args
, bl
);
42 decode(arn_topic
, bl
);
46 WRITE_CLASS_ENCODER(event_entry_t
)
48 using queues_t
= std::set
<std::string
>;
50 // use mmap/mprotect to allocate 128k coroutine stacks
51 auto make_stack_allocator() {
52 return boost::context::protected_fixedsize_stack
{128*1024};
55 class Manager
: public DoutPrefixProvider
{
56 const size_t max_queue_size
;
57 const uint32_t queues_update_period_ms
;
58 const uint32_t queues_update_retry_ms
;
59 const uint32_t queue_idle_sleep_us
;
60 const utime_t failover_time
;
61 CephContext
* const cct
;
62 librados::IoCtx
& rados_ioctx
;
63 static constexpr auto COOKIE_LEN
= 16;
64 const std::string lock_cookie
;
65 boost::asio::io_context io_context
;
66 boost::asio::executor_work_guard
<boost::asio::io_context::executor_type
> work_guard
;
67 const uint32_t worker_count
;
68 std::vector
<std::thread
> workers
;
69 const uint32_t stale_reservations_period_s
;
70 const uint32_t reservations_cleanup_period_s
;
72 const std::string Q_LIST_OBJECT_NAME
= "queues_list_object";
74 CephContext
*get_cct() const override
{ return cct
; }
75 unsigned get_subsys() const override
{ return dout_subsys
; }
76 std::ostream
& gen_prefix(std::ostream
& out
) const override
{ return out
<< "rgw notify: "; }
78 // read the list of queues from the queue list object
79 int read_queue_list(queues_t
& queues
, optional_yield y
) {
80 constexpr auto max_chunk
= 1024U;
81 std::string start_after
;
85 librados::ObjectReadOperation op
;
86 queues_t queues_chunk
;
87 op
.omap_get_keys2(start_after
, max_chunk
, &queues_chunk
, &more
, &rval
);
88 const auto ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, nullptr, y
);
90 // queue list object was not created - nothing to do
94 // TODO: do we need to check on rval as well as ret?
95 ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret
<< dendl
;
98 queues
.merge(queues_chunk
);
103 // set m1 to be the minimum between m1 and m2
104 static int set_min_marker(std::string
& m1
, const std::string m2
) {
105 cls_queue_marker mr1
;
106 cls_queue_marker mr2
;
107 if (mr1
.from_str(m1
.c_str()) < 0 || mr2
.from_str(m2
.c_str()) < 0) {
110 if (mr2
.gen
<= mr1
.gen
&& mr2
.offset
< mr1
.offset
) {
116 using Clock
= ceph::coarse_mono_clock
;
117 using Executor
= boost::asio::io_context::executor_type
;
118 using Timer
= boost::asio::basic_waitable_timer
<Clock
,
119 boost::asio::wait_traits
<Clock
>, Executor
>;
121 class tokens_waiter
{
122 const std::chrono::hours infinite_duration
;
123 size_t pending_tokens
;
127 tokens_waiter
& waiter
;
128 token(tokens_waiter
& _waiter
) : waiter(_waiter
) {
129 ++waiter
.pending_tokens
;
133 --waiter
.pending_tokens
;
134 if (waiter
.pending_tokens
== 0) {
135 waiter
.timer
.cancel();
142 tokens_waiter(boost::asio::io_context
& io_context
) :
143 infinite_duration(1000),
147 void async_wait(yield_context yield
) {
148 if (pending_tokens
== 0) {
151 timer
.expires_from_now(infinite_duration
);
152 boost::system::error_code ec
;
153 timer
.async_wait(yield
[ec
]);
154 ceph_assert(ec
== boost::system::errc::operation_canceled
);
162 // processing of a specific entry
163 // return whether processing was successfull (true) or not (false)
164 bool process_entry(const cls_queue_entry
& entry
, yield_context yield
) {
165 event_entry_t event_entry
;
166 auto iter
= entry
.data
.cbegin();
168 decode(event_entry
, iter
);
169 } catch (buffer::error
& err
) {
170 ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err
.what() << dendl
;
174 // TODO move endpoint creation to queue level
175 const auto push_endpoint
= RGWPubSubEndpoint::create(event_entry
.push_endpoint
, event_entry
.arn_topic
,
176 RGWHTTPArgs(event_entry
.push_endpoint_args
, this),
178 ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry
.push_endpoint
<<
179 " for entry: " << entry
.marker
<< dendl
;
180 const auto ret
= push_endpoint
->send_to_completion_async(cct
, event_entry
.event
, optional_yield(io_context
, yield
));
182 ldpp_dout(this, 5) << "WARNING: push entry: " << entry
.marker
<< " to endpoint: " << event_entry
.push_endpoint
183 << " failed. error: " << ret
<< " (will retry)" << dendl
;
186 ldpp_dout(this, 20) << "INFO: push entry: " << entry
.marker
<< " to endpoint: " << event_entry
.push_endpoint
188 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
191 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
192 ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
193 << event_entry
.push_endpoint
<< " for entry: " << entry
.marker
<< ". error: " << e
.what() << " (will retry) " << dendl
;
198 // clean stale reservation from queue
199 void cleanup_queue(const std::string
& queue_name
, yield_context yield
) {
201 ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name
<< dendl
;
202 const auto now
= ceph::coarse_real_time::clock::now();
203 const auto stale_time
= now
- std::chrono::seconds(stale_reservations_period_s
);
204 librados::ObjectWriteOperation op
;
206 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
207 ClsLockType::EXCLUSIVE
,
210 cls_2pc_queue_expire_reservations(op
, stale_time
);
211 // check ownership and do reservation cleanup in one batch
212 auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
213 if (ret
== -ENOENT
) {
215 ldpp_dout(this, 5) << "INFO: queue: "
216 << queue_name
<< ". was removed. cleanup will stop" << dendl
;
220 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
224 ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
225 << ". error: " << ret
<< dendl
;
227 Timer
timer(io_context
);
228 timer
.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s
));
229 boost::system::error_code ec
;
230 timer
.async_wait(yield
[ec
]);
234 // processing of a specific queue
235 void process_queue(const std::string
& queue_name
, yield_context yield
) {
236 constexpr auto max_elements
= 1024;
237 auto is_idle
= false;
238 const std::string start_marker
;
240 // start a the cleanup coroutine for the queue
241 spawn::spawn(io_context
, [this, queue_name
](yield_context yield
) {
242 cleanup_queue(queue_name
, yield
);
243 }, make_stack_allocator());
246 // if queue was empty the last time, sleep for idle timeout
248 Timer
timer(io_context
);
249 timer
.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us
));
250 boost::system::error_code ec
;
251 timer
.async_wait(yield
[ec
]);
254 // get list of entries in the queue
256 bool truncated
= false;
257 std::string end_marker
;
258 std::vector
<cls_queue_entry
> entries
;
259 auto total_entries
= 0U;
261 librados::ObjectReadOperation op
;
265 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
266 ClsLockType::EXCLUSIVE
,
269 cls_2pc_queue_list_entries(op
, start_marker
, max_elements
, &obl
, &rval
);
270 // check ownership and list entries in one batch
271 auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, nullptr, optional_yield(io_context
, yield
));
272 if (ret
== -ENOENT
) {
274 ldpp_dout(this, 5) << "INFO: queue: "
275 << queue_name
<< ". was removed. processing will stop" << dendl
;
279 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
283 ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
284 << queue_name
<< ". error: " << ret
<< " (will retry)" << dendl
;
287 ret
= cls_2pc_queue_list_entries_result(obl
, entries
, &truncated
, end_marker
);
289 ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
290 << queue_name
<< ". error: " << ret
<< " (will retry)" << dendl
;
294 total_entries
= entries
.size();
295 if (total_entries
== 0) {
296 // nothing in the queue
299 // log when queue is not idle
300 ldpp_dout(this, 20) << "INFO: found: " << total_entries
<< " entries in: " << queue_name
<<
301 ". end marker is: " << end_marker
<< dendl
;
304 auto has_error
= false;
305 auto remove_entries
= false;
307 tokens_waiter
waiter(io_context
);
308 for (auto& entry
: entries
) {
310 // bail out on first error
313 // TODO pass entry pointer instead of by-value
314 spawn::spawn(yield
, [this, &queue_name
, entry_idx
, total_entries
, &end_marker
, &remove_entries
, &has_error
, &waiter
, entry
](yield_context yield
) {
315 const auto token
= waiter
.make_token();
316 if (process_entry(entry
, yield
)) {
317 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
318 entry
.marker
<< " (" << entry_idx
<< "/" << total_entries
<< ") from: " << queue_name
<< " ok" << dendl
;
319 remove_entries
= true;
321 if (set_min_marker(end_marker
, entry
.marker
) < 0) {
322 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker
<< ", " << entry
.marker
<< dendl
;
324 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker
<< " from: " << queue_name
<< dendl
;
327 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
328 entry
.marker
<< " (" << entry_idx
<< "/" << total_entries
<< ") from: " << queue_name
<< " failed" << dendl
;
330 }, make_stack_allocator());
334 // wait for all pending work to finish
335 waiter
.async_wait(yield
);
337 // delete all published entries from queue
338 if (remove_entries
) {
339 librados::ObjectWriteOperation op
;
341 rados::cls::lock::assert_locked(&op
, queue_name
+"_lock",
342 ClsLockType::EXCLUSIVE
,
345 cls_2pc_queue_remove_entries(op
, end_marker
);
346 // check ownership and deleted entries in one batch
347 const auto ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
348 if (ret
== -ENOENT
) {
350 ldpp_dout(this, 5) << "INFO: queue: "
351 << queue_name
<< ". was removed. processing will stop" << dendl
;
355 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop" << dendl
;
359 ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker
<< " from queue: "
360 << queue_name
<< ". error: " << ret
<< dendl
;
362 ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker
<< " from queue: "
363 << queue_name
<< dendl
;
369 // lits of owned queues
370 using owned_queues_t
= std::unordered_set
<std::string
>;
372 // process all queues
373 // find which of the queues is owned by this daemon and process it
374 void process_queues(yield_context yield
) {
375 auto has_error
= false;
376 owned_queues_t owned_queues
;
378 // add randomness to the duration between queue checking
379 // to make sure that different daemons are not synced
380 std::random_device seed
;
381 std::mt19937
rnd_gen(seed());
382 const auto min_jitter
= 100; // ms
383 const auto max_jitter
= 500; // ms
384 std::uniform_int_distribution
<> duration_jitter(min_jitter
, max_jitter
);
386 std::vector
<std::string
> queue_gc
;
387 std::mutex queue_gc_lock
;
389 Timer
timer(io_context
);
390 const auto duration
= (has_error
?
391 std::chrono::milliseconds(queues_update_retry_ms
) : std::chrono::milliseconds(queues_update_period_ms
)) +
392 std::chrono::milliseconds(duration_jitter(rnd_gen
));
393 timer
.expires_from_now(duration
);
394 const auto tp
= ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration
);
395 ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp
) << dendl
;
396 boost::system::error_code ec
;
397 timer
.async_wait(yield
[ec
]);
400 auto ret
= read_queue_list(queues
, optional_yield(io_context
, yield
));
406 for (const auto& queue_name
: queues
) {
407 // try to lock the queue to check if it is owned by this rgw
408 // or if ownershif needs to be taken
409 librados::ObjectWriteOperation op
;
411 rados::cls::lock::lock(&op
, queue_name
+"_lock",
412 ClsLockType::EXCLUSIVE
,
415 "" /*no description*/,
417 LOCK_FLAG_MAY_RENEW
);
419 ret
= rgw_rados_operate(this, rados_ioctx
, queue_name
, &op
, optional_yield(io_context
, yield
));
421 // lock is already taken by another RGW
422 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " owned (locked) by another daemon" << dendl
;
423 // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
426 if (ret
== -ENOENT
) {
427 // queue is deleted - processing will stop the next time we try to read from the queue
428 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " should not be locked - already deleted" << dendl
;
432 // failed to lock for another reason, continue to process other queues
433 ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name
<< ". error: " << ret
<< dendl
;
437 // add queue to list of owned queues
438 if (owned_queues
.insert(queue_name
).second
) {
439 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " now owned (locked) by this daemon" << dendl
;
440 // start processing this queue
441 spawn::spawn(io_context
, [this, &queue_gc
, &queue_gc_lock
, queue_name
](yield_context yield
) {
442 process_queue(queue_name
, yield
);
443 // if queue processing ended, it measn that the queue was removed or not owned anymore
444 // mark it for deletion
445 std::lock_guard
lock_guard(queue_gc_lock
);
446 queue_gc
.push_back(queue_name
);
447 ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< " marked for removal" << dendl
;
448 }, make_stack_allocator());
450 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " ownership (lock) renewed" << dendl
;
453 // erase all queue that were deleted
455 std::lock_guard
lock_guard(queue_gc_lock
);
456 std::for_each(queue_gc
.begin(), queue_gc
.end(), [this, &owned_queues
](const std::string
& queue_name
) {
457 owned_queues
.erase(queue_name
);
458 ldpp_dout(this, 20) << "INFO: queue: " << queue_name
<< " removed" << dendl
;
470 std::for_each(workers
.begin(), workers
.end(), [] (auto& worker
) { worker
.join(); });
473 // ctor: start all threads
474 Manager(CephContext
* _cct
, uint32_t _max_queue_size
, uint32_t _queues_update_period_ms
,
475 uint32_t _queues_update_retry_ms
, uint32_t _queue_idle_sleep_us
, u_int32_t failover_time_ms
,
476 uint32_t _stale_reservations_period_s
, uint32_t _reservations_cleanup_period_s
,
477 uint32_t _worker_count
, rgw::sal::RadosStore
* store
) :
478 max_queue_size(_max_queue_size
),
479 queues_update_period_ms(_queues_update_period_ms
),
480 queues_update_retry_ms(_queues_update_retry_ms
),
481 queue_idle_sleep_us(_queue_idle_sleep_us
),
482 failover_time(std::chrono::milliseconds(failover_time_ms
)),
484 rados_ioctx(store
->getRados()->get_notif_pool_ctx()),
485 lock_cookie(gen_rand_alphanumeric(cct
, COOKIE_LEN
)),
486 work_guard(boost::asio::make_work_guard(io_context
)),
487 worker_count(_worker_count
),
488 stale_reservations_period_s(_stale_reservations_period_s
),
489 reservations_cleanup_period_s(_reservations_cleanup_period_s
)
491 spawn::spawn(io_context
, [this] (yield_context yield
) {
492 process_queues(yield
);
493 }, make_stack_allocator());
495 // start the worker threads to do the actual queue processing
496 const std::string WORKER_THREAD_NAME
= "notif-worker";
497 for (auto worker_id
= 0U; worker_id
< worker_count
; ++worker_id
) {
498 workers
.emplace_back([this]() {
501 } catch (const std::exception
& err
) {
502 ldpp_dout(this, 10) << "Notification worker failed with error: " << err
.what() << dendl
;
506 const auto rc
= ceph_pthread_setname(workers
.back().native_handle(),
507 (WORKER_THREAD_NAME
+std::to_string(worker_id
)).c_str());
508 ceph_assert(rc
== 0);
510 ldpp_dout(this, 10) << "Started notification manager with: " << worker_count
<< " workers" << dendl
;
513 int add_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
514 if (topic_name
== Q_LIST_OBJECT_NAME
) {
515 ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME
<< " (conflict with queue list object name)" << dendl
;
518 librados::ObjectWriteOperation op
;
520 cls_2pc_queue_init(op
, topic_name
, max_queue_size
);
521 auto ret
= rgw_rados_operate(this, rados_ioctx
, topic_name
, &op
, y
);
522 if (ret
== -EEXIST
) {
523 // queue already exists - nothing to do
524 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name
<< " already exists. nothing to do" << dendl
;
528 // failed to create queue
529 ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name
<< ". error: " << ret
<< dendl
;
534 std::map
<std::string
, bufferlist
> new_topic
{{topic_name
, empty_bl
}};
535 op
.omap_set(new_topic
);
536 ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, y
);
538 ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name
<< " to queue list. error: " << ret
<< dendl
;
541 ldpp_dout(this, 20) << "INFO: queue: " << topic_name
<< " added to queue list" << dendl
;
545 int remove_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
546 librados::ObjectWriteOperation op
;
548 auto ret
= rgw_rados_operate(this, rados_ioctx
, topic_name
, &op
, y
);
549 if (ret
== -ENOENT
) {
550 // queue already removed - nothing to do
551 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name
<< " already removed. nothing to do" << dendl
;
555 // failed to remove queue
556 ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name
<< ". error: " << ret
<< dendl
;
560 std::set
<std::string
> topic_to_remove
{{topic_name
}};
561 op
.omap_rm_keys(topic_to_remove
);
562 ret
= rgw_rados_operate(this, rados_ioctx
, Q_LIST_OBJECT_NAME
, &op
, y
);
564 ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name
<< " from queue list. error: " << ret
<< dendl
;
567 ldpp_dout(this, 20) << "INFO: queue: " << topic_name
<< " removed from queue list" << dendl
;
573 // note that the manager itself is not a singleton, and multiple instances may co-exist
574 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
575 static Manager
* s_manager
= nullptr;
577 constexpr size_t MAX_QUEUE_SIZE
= 128*1000*1000; // 128MB
578 constexpr uint32_t Q_LIST_UPDATE_MSEC
= 1000*30; // check queue list every 30seconds
579 constexpr uint32_t Q_LIST_RETRY_MSEC
= 1000; // retry every second if queue list update failed
580 constexpr uint32_t IDLE_TIMEOUT_USEC
= 100*1000; // idle sleep 100ms
581 constexpr uint32_t FAILOVER_TIME_MSEC
= 3*Q_LIST_UPDATE_MSEC
; // FAILOVER TIME 3x renew time
582 constexpr uint32_t WORKER_COUNT
= 1; // 1 worker thread
583 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S
= 120; // cleanup reservations that are more than 2 minutes old
584 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S
= 30; // reservation cleanup every 30 seconds
586 bool init(CephContext
* cct
, rgw::sal::RadosStore
* store
, const DoutPrefixProvider
*dpp
) {
590 // TODO: take conf from CephContext
591 s_manager
= new Manager(cct
, MAX_QUEUE_SIZE
,
592 Q_LIST_UPDATE_MSEC
, Q_LIST_RETRY_MSEC
,
593 IDLE_TIMEOUT_USEC
, FAILOVER_TIME_MSEC
,
594 STALE_RESERVATIONS_PERIOD_S
, RESERVATIONS_CLEANUP_PERIOD_S
,
605 int add_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
609 return s_manager
->add_persistent_topic(topic_name
, y
);
612 int remove_persistent_topic(const std::string
& topic_name
, optional_yield y
) {
616 return s_manager
->remove_persistent_topic(topic_name
, y
);
619 rgw::sal::Object
* get_object_with_atttributes(
620 const reservation_t
& res
, rgw::sal::Object
* obj
) {
621 // in case of copy obj, the tags and metadata are taken from source
622 const auto src_obj
= res
.src_object
? res
.src_object
: obj
;
623 if (src_obj
->get_attrs().empty()) {
624 if (!src_obj
->get_bucket()) {
625 src_obj
->set_bucket(res
.bucket
);
627 if (src_obj
->get_obj_attrs(res
.obj_ctx
, res
.yield
, res
.dpp
) < 0) {
634 static inline void metadata_from_attributes(
635 reservation_t
& res
, rgw::sal::Object
* obj
) {
636 auto& metadata
= res
.x_meta_map
;
637 const auto src_obj
= get_object_with_atttributes(res
, obj
);
641 for (auto& attr
: src_obj
->get_attrs()) {
642 if (boost::algorithm::starts_with(attr
.first
, RGW_ATTR_META_PREFIX
)) {
643 std::string_view
key(attr
.first
);
644 key
.remove_prefix(sizeof(RGW_ATTR_PREFIX
)-1);
645 // we want to pass a null terminated version
646 // of the bufferlist, hence "to_str().c_str()"
647 metadata
.emplace(key
, attr
.second
.to_str().c_str());
652 static inline void tags_from_attributes(
653 const reservation_t
& res
, rgw::sal::Object
* obj
, KeyMultiValueMap
& tags
) {
654 const auto src_obj
= get_object_with_atttributes(res
, obj
);
658 const auto& attrs
= src_obj
->get_attrs();
659 const auto attr_iter
= attrs
.find(RGW_ATTR_TAGS
);
660 if (attr_iter
!= attrs
.end()) {
661 auto bliter
= attr_iter
->second
.cbegin();
664 ::decode(obj_tags
, bliter
);
665 } catch(buffer::error
&) {
666 // not able to decode tags
669 tags
= std::move(obj_tags
.get_tags());
673 // populate event from request
674 static inline void populate_event(reservation_t
& res
,
675 rgw::sal::Object
* obj
,
677 const ceph::real_time
& mtime
,
678 const std::string
& etag
,
679 const std::string
& version
,
680 EventType event_type
,
681 rgw_pubsub_s3_event
& event
) {
682 event
.eventTime
= mtime
;
683 event
.eventName
= to_event_string(event_type
);
684 event
.userIdentity
= res
.user_id
; // user that triggered the change
685 event
.x_amz_request_id
= res
.req_id
; // request ID of the original change
686 event
.x_amz_id_2
= res
.store
->getRados()->host_id
; // RGW on which the change was made
687 // configurationId is filled from notification configuration
688 event
.bucket_name
= res
.bucket
->get_name();
689 event
.bucket_ownerIdentity
= res
.bucket
->get_owner()->get_id().id
;
690 event
.bucket_arn
= to_string(rgw::ARN(res
.bucket
->get_key()));
691 event
.object_key
= res
.object_name
? *res
.object_name
: obj
->get_name();
692 event
.object_size
= size
;
693 event
.object_etag
= etag
;
694 event
.object_versionId
= version
;
695 event
.awsRegion
= res
.store
->get_zone()->get_zonegroup().api_name
;
696 // use timestamp as per key sequence id (hex encoded)
697 const utime_t
ts(real_clock::now());
698 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
699 std::back_inserter(event
.object_sequencer
));
700 set_event_id(event
.id
, etag
, ts
);
701 event
.bucket_id
= res
.bucket
->get_bucket_id();
703 if (res
.x_meta_map
.empty()) {
704 // no metadata cached:
705 // either no metadata exist or no metadata filter was used
706 metadata_from_attributes(res
, obj
);
708 event
.x_meta_map
= res
.x_meta_map
;
712 (*res
.tagset
).get_tags().empty()) {
713 // try to fetch the tags from the attributes
714 tags_from_attributes(res
, obj
, event
.tags
);
716 event
.tags
= (*res
.tagset
).get_tags();
718 // opaque data will be filled from topic configuration
721 static inline bool notification_match(reservation_t
& res
,
722 const rgw_pubsub_topic_filter
& filter
,
724 const RGWObjTags
* req_tags
) {
725 if (!match(filter
.events
, event
)) {
728 const auto obj
= res
.object
;
729 if (!match(filter
.s3_filter
.key_filter
,
730 res
.object_name
? *res
.object_name
: obj
->get_name())) {
734 if (!filter
.s3_filter
.metadata_filter
.kv
.empty()) {
735 // metadata filter exists
737 res
.x_meta_map
= res
.s
->info
.x_meta_map
;
739 metadata_from_attributes(res
, obj
);
740 if (!match(filter
.s3_filter
.metadata_filter
, res
.x_meta_map
)) {
745 if (!filter
.s3_filter
.tag_filter
.kv
.empty()) {
748 // tags in the request
749 if (!match(filter
.s3_filter
.tag_filter
, req_tags
->get_tags())) {
752 } else if (res
.tagset
&& !(*res
.tagset
).get_tags().empty()) {
753 // tags were cached in req_state
754 if (!match(filter
.s3_filter
.tag_filter
, (*res
.tagset
).get_tags())) {
758 // try to fetch tags from the attributes
759 KeyMultiValueMap tags
;
760 tags_from_attributes(res
, obj
, tags
);
761 if (!match(filter
.s3_filter
.tag_filter
, tags
)) {
770 int publish_reserve(const DoutPrefixProvider
* dpp
,
771 EventType event_type
,
773 const RGWObjTags
* req_tags
)
775 RGWPubSub
ps(res
.store
, res
.user_tenant
);
776 RGWPubSub::Bucket
ps_bucket(&ps
, res
.bucket
->get_key());
777 rgw_pubsub_bucket_topics bucket_topics
;
778 auto rc
= ps_bucket
.get_topics(&bucket_topics
);
780 // failed to fetch bucket topics
783 for (const auto& bucket_topic
: bucket_topics
.topics
) {
784 const rgw_pubsub_topic_filter
& topic_filter
= bucket_topic
.second
;
785 const rgw_pubsub_topic
& topic_cfg
= topic_filter
.topic
;
786 if (!notification_match(res
, topic_filter
, event_type
, req_tags
)) {
787 // notification does not apply to req_state
790 ldpp_dout(res
.dpp
, 20) << "INFO: notification: '" << topic_filter
.s3_id
<<
791 "' on topic: '" << topic_cfg
.dest
.arn_topic
<<
792 "' and bucket: '" << res
.bucket
->get_name() <<
793 "' (unique topic: '" << topic_cfg
.name
<<
794 "') apply to event of type: '" << to_string(event_type
) << "'" << dendl
;
796 cls_2pc_reservation::id_t res_id
;
797 if (topic_cfg
.dest
.persistent
) {
798 // TODO: take default reservation size from conf
799 constexpr auto DEFAULT_RESERVATION
= 4*1024U; // 4K
800 res
.size
= DEFAULT_RESERVATION
;
801 librados::ObjectWriteOperation op
;
804 const auto& queue_name
= topic_cfg
.dest
.arn_topic
;
805 cls_2pc_queue_reserve(op
, res
.size
, 1, &obl
, &rval
);
806 auto ret
= rgw_rados_operate(
807 res
.dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
808 queue_name
, &op
, res
.yield
, librados::OPERATION_RETURNVEC
);
810 ldpp_dout(res
.dpp
, 1) <<
811 "ERROR: failed to reserve notification on queue: "
812 << queue_name
<< ". error: " << ret
<< dendl
;
813 // if no space is left in queue we ask client to slow down
814 return (ret
== -ENOSPC
) ? -ERR_RATE_LIMITED
: ret
;
816 ret
= cls_2pc_queue_reserve_result(obl
, res_id
);
818 ldpp_dout(res
.dpp
, 1) << "ERROR: failed to parse reservation id. error: " << ret
<< dendl
;
822 res
.topics
.emplace_back(topic_filter
.s3_id
, topic_cfg
, res_id
);
827 int publish_commit(rgw::sal::Object
* obj
,
829 const ceph::real_time
& mtime
,
830 const std::string
& etag
,
831 const std::string
& version
,
832 EventType event_type
,
834 const DoutPrefixProvider
* dpp
)
836 for (auto& topic
: res
.topics
) {
837 if (topic
.cfg
.dest
.persistent
&&
838 topic
.res_id
== cls_2pc_reservation::NO_ID
) {
839 // nothing to commit or already committed/aborted
842 event_entry_t event_entry
;
843 populate_event(res
, obj
, size
, mtime
, etag
, version
, event_type
, event_entry
.event
);
844 event_entry
.event
.configurationId
= topic
.configurationId
;
845 event_entry
.event
.opaque_data
= topic
.cfg
.opaque_data
;
846 if (topic
.cfg
.dest
.persistent
) {
847 event_entry
.push_endpoint
= std::move(topic
.cfg
.dest
.push_endpoint
);
848 event_entry
.push_endpoint_args
=
849 std::move(topic
.cfg
.dest
.push_endpoint_args
);
850 event_entry
.arn_topic
= topic
.cfg
.dest
.arn_topic
;
852 encode(event_entry
, bl
);
853 const auto& queue_name
= topic
.cfg
.dest
.arn_topic
;
854 if (bl
.length() > res
.size
) {
855 // try to make a larger reservation, fail only if this is not possible
856 ldpp_dout(dpp
, 5) << "WARNING: committed size: " << bl
.length()
857 << " exceeded reserved size: " << res
.size
859 " . trying to make a larger reservation on queue:" << queue_name
861 // first cancel the existing reservation
862 librados::ObjectWriteOperation op
;
863 cls_2pc_queue_abort(op
, topic
.res_id
);
864 auto ret
= rgw_rados_operate(
865 dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
866 topic
.cfg
.dest
.arn_topic
, &op
,
869 ldpp_dout(dpp
, 1) << "ERROR: failed to abort reservation: "
871 " when trying to make a larger reservation on queue: " << queue_name
872 << ". error: " << ret
<< dendl
;
875 // now try to make a bigger one
878 cls_2pc_queue_reserve(op
, bl
.length(), 1, &obl
, &rval
);
879 ret
= rgw_rados_operate(
880 dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
881 queue_name
, &op
, res
.yield
, librados::OPERATION_RETURNVEC
);
883 ldpp_dout(dpp
, 1) << "ERROR: failed to reserve extra space on queue: "
885 << ". error: " << ret
<< dendl
;
886 return (ret
== -ENOSPC
) ? -ERR_RATE_LIMITED
: ret
;
888 ret
= cls_2pc_queue_reserve_result(obl
, topic
.res_id
);
890 ldpp_dout(dpp
, 1) << "ERROR: failed to parse reservation id for "
891 "extra space. error: " << ret
<< dendl
;
895 std::vector
<buffer::list
> bl_data_vec
{std::move(bl
)};
896 librados::ObjectWriteOperation op
;
897 cls_2pc_queue_commit(op
, bl_data_vec
, topic
.res_id
);
898 const auto ret
= rgw_rados_operate(
899 dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
900 queue_name
, &op
, res
.yield
);
901 topic
.res_id
= cls_2pc_reservation::NO_ID
;
903 ldpp_dout(dpp
, 1) << "ERROR: failed to commit reservation to queue: "
904 << queue_name
<< ". error: " << ret
910 // TODO add endpoint LRU cache
911 const auto push_endpoint
= RGWPubSubEndpoint::create(
912 topic
.cfg
.dest
.push_endpoint
,
913 topic
.cfg
.dest
.arn_topic
,
914 RGWHTTPArgs(topic
.cfg
.dest
.push_endpoint_args
, dpp
),
916 ldpp_dout(res
.dpp
, 20) << "INFO: push endpoint created: "
917 << topic
.cfg
.dest
.push_endpoint
<< dendl
;
918 const auto ret
= push_endpoint
->send_to_completion_async(
919 dpp
->get_cct(), event_entry
.event
, res
.yield
);
921 ldpp_dout(dpp
, 1) << "ERROR: push to endpoint "
922 << topic
.cfg
.dest
.push_endpoint
923 << " failed. error: " << ret
<< dendl
;
924 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
927 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
928 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
929 ldpp_dout(dpp
, 1) << "ERROR: failed to create push endpoint: "
930 << topic
.cfg
.dest
.push_endpoint
<< ". error: " << e
.what() << dendl
;
931 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
939 extern int publish_abort(reservation_t
& res
) {
940 for (auto& topic
: res
.topics
) {
941 if (!topic
.cfg
.dest
.persistent
||
942 topic
.res_id
== cls_2pc_reservation::NO_ID
) {
943 // nothing to abort or already committed/aborted
946 const auto& queue_name
= topic
.cfg
.dest
.arn_topic
;
947 librados::ObjectWriteOperation op
;
948 cls_2pc_queue_abort(op
, topic
.res_id
);
949 const auto ret
= rgw_rados_operate(
950 res
.dpp
, res
.store
->getRados()->get_notif_pool_ctx(),
951 queue_name
, &op
, res
.yield
);
953 ldpp_dout(res
.dpp
, 1) << "ERROR: failed to abort reservation: "
955 " from queue: " << queue_name
<< ". error: " << ret
<< dendl
;
958 topic
.res_id
= cls_2pc_reservation::NO_ID
;
963 reservation_t::reservation_t(const DoutPrefixProvider
* _dpp
,
964 rgw::sal::RadosStore
* _store
,
966 rgw::sal::Object
* _object
,
967 rgw::sal::Object
* _src_object
,
968 const std::string
* _object_name
) :
969 dpp(_s
), store(_store
), s(_s
), size(0) /* XXX */, obj_ctx(_s
->obj_ctx
),
970 object(_object
), src_object(_src_object
), bucket(_s
->bucket
.get()),
971 object_name(_object_name
),
973 x_meta_map(_s
->info
.x_meta_map
),
974 user_id(_s
->user
->get_id().id
),
975 user_tenant(_s
->user
->get_id().tenant
),
980 reservation_t::reservation_t(const DoutPrefixProvider
* _dpp
,
981 rgw::sal::RadosStore
* _store
,
982 RGWObjectCtx
* _obj_ctx
,
983 rgw::sal::Object
* _object
,
984 rgw::sal::Object
* _src_object
,
985 rgw::sal::Bucket
* _bucket
,
986 std::string
& _user_id
,
987 std::string
& _user_tenant
,
988 std::string
& _req_id
,
990 dpp(_dpp
), store(_store
), s(nullptr), size(0) /* XXX */,
992 object(_object
), src_object(_src_object
), bucket(_bucket
),
993 object_name(nullptr),
995 user_tenant(_user_tenant
),
1000 reservation_t::~reservation_t() {
1001 publish_abort(*this);
1004 } // namespace rgw::notify