]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/driver/rados/rgw_notify.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / driver / rados / rgw_notify.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_notify.h"
f67539c2
TL
5#include "cls/2pc_queue/cls_2pc_queue_client.h"
6#include "cls/lock/cls_lock_client.h"
eafe8130
TL
7#include <memory>
8#include <boost/algorithm/hex.hpp>
f67539c2
TL
9#include <boost/context/protected_fixedsize_stack.hpp>
10#include <spawn/spawn.hpp>
20effc67 11#include "rgw_sal_rados.h"
eafe8130
TL
12#include "rgw_pubsub.h"
13#include "rgw_pubsub_push.h"
14#include "rgw_perf_counters.h"
15#include "common/dout.h"
f67539c2 16#include <chrono>
eafe8130
TL
17
18#define dout_subsys ceph_subsys_rgw
19
20namespace rgw::notify {
21
f67539c2
TL
22struct event_entry_t {
23 rgw_pubsub_s3_event event;
24 std::string push_endpoint;
25 std::string push_endpoint_args;
26 std::string arn_topic;
27
28 void encode(bufferlist& bl) const {
29 ENCODE_START(1, 1, bl);
30 encode(event, bl);
31 encode(push_endpoint, bl);
32 encode(push_endpoint_args, bl);
33 encode(arn_topic, bl);
34 ENCODE_FINISH(bl);
35 }
36
37 void decode(bufferlist::const_iterator& bl) {
38 DECODE_START(1, bl);
39 decode(event, bl);
40 decode(push_endpoint, bl);
41 decode(push_endpoint_args, bl);
42 decode(arn_topic, bl);
43 DECODE_FINISH(bl);
44 }
45};
46WRITE_CLASS_ENCODER(event_entry_t)
47
48using queues_t = std::set<std::string>;
49
50// use mmap/mprotect to allocate 128k coroutine stacks
51auto make_stack_allocator() {
52 return boost::context::protected_fixedsize_stack{128*1024};
53}
54
aee94f69
TL
55const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
56
b3b6e05e 57class Manager : public DoutPrefixProvider {
f67539c2
TL
58 const size_t max_queue_size;
59 const uint32_t queues_update_period_ms;
60 const uint32_t queues_update_retry_ms;
61 const uint32_t queue_idle_sleep_us;
62 const utime_t failover_time;
63 CephContext* const cct;
f67539c2
TL
64 static constexpr auto COOKIE_LEN = 16;
65 const std::string lock_cookie;
66 boost::asio::io_context io_context;
67 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
68 const uint32_t worker_count;
69 std::vector<std::thread> workers;
70 const uint32_t stale_reservations_period_s;
71 const uint32_t reservations_cleanup_period_s;
aee94f69
TL
72public:
73 librados::IoCtx& rados_ioctx;
74private:
f67539c2 75
b3b6e05e
TL
76 CephContext *get_cct() const override { return cct; }
77 unsigned get_subsys() const override { return dout_subsys; }
78 std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }
79
f67539c2
TL
80 // read the list of queues from the queue list object
81 int read_queue_list(queues_t& queues, optional_yield y) {
82 constexpr auto max_chunk = 1024U;
83 std::string start_after;
84 bool more = true;
85 int rval;
86 while (more) {
87 librados::ObjectReadOperation op;
88 queues_t queues_chunk;
89 op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
b3b6e05e 90 const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
f67539c2
TL
91 if (ret == -ENOENT) {
92 // queue list object was not created - nothing to do
93 return 0;
94 }
95 if (ret < 0) {
96 // TODO: do we need to check on rval as well as ret?
b3b6e05e 97 ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
f67539c2
TL
98 return ret;
99 }
100 queues.merge(queues_chunk);
101 }
102 return 0;
103 }
104
105 // set m1 to be the minimum between m1 and m2
106 static int set_min_marker(std::string& m1, const std::string m2) {
107 cls_queue_marker mr1;
108 cls_queue_marker mr2;
109 if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) {
110 return -EINVAL;
111 }
112 if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) {
113 m1 = m2;
114 }
115 return 0;
116 }
117
118 using Clock = ceph::coarse_mono_clock;
119 using Executor = boost::asio::io_context::executor_type;
120 using Timer = boost::asio::basic_waitable_timer<Clock,
121 boost::asio::wait_traits<Clock>, Executor>;
122
123 class tokens_waiter {
124 const std::chrono::hours infinite_duration;
125 size_t pending_tokens;
126 Timer timer;
127
128 struct token {
129 tokens_waiter& waiter;
130 token(tokens_waiter& _waiter) : waiter(_waiter) {
131 ++waiter.pending_tokens;
132 }
133
134 ~token() {
135 --waiter.pending_tokens;
136 if (waiter.pending_tokens == 0) {
137 waiter.timer.cancel();
138 }
139 }
140 };
141
142 public:
143
144 tokens_waiter(boost::asio::io_context& io_context) :
145 infinite_duration(1000),
146 pending_tokens(0),
147 timer(io_context) {}
148
20effc67 149 void async_wait(yield_context yield) {
522d829b
TL
150 if (pending_tokens == 0) {
151 return;
152 }
f67539c2
TL
153 timer.expires_from_now(infinite_duration);
154 boost::system::error_code ec;
155 timer.async_wait(yield[ec]);
156 ceph_assert(ec == boost::system::errc::operation_canceled);
157 }
158
159 token make_token() {
160 return token(*this);
161 }
162 };
163
164 // processing of a specific entry
165 // return whether processing was successfull (true) or not (false)
20effc67 166 bool process_entry(const cls_queue_entry& entry, yield_context yield) {
f67539c2
TL
167 event_entry_t event_entry;
168 auto iter = entry.data.cbegin();
169 try {
170 decode(event_entry, iter);
171 } catch (buffer::error& err) {
b3b6e05e 172 ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
f67539c2
TL
173 return false;
174 }
175 try {
176 // TODO move endpoint creation to queue level
177 const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
b3b6e05e 178 RGWHTTPArgs(event_entry.push_endpoint_args, this),
f67539c2 179 cct);
b3b6e05e 180 ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
f67539c2
TL
181 " for entry: " << entry.marker << dendl;
182 const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
183 if (ret < 0) {
b3b6e05e 184 ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
f67539c2
TL
185 << " failed. error: " << ret << " (will retry)" << dendl;
186 return false;
187 } else {
b3b6e05e 188 ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
f67539c2
TL
189 << " ok" << dendl;
190 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
191 return true;
192 }
193 } catch (const RGWPubSubEndpoint::configuration_error& e) {
b3b6e05e 194 ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
f67539c2
TL
195 << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
196 return false;
197 }
198 }
199
200 // clean stale reservation from queue
20effc67 201 void cleanup_queue(const std::string& queue_name, yield_context yield) {
f67539c2 202 while (true) {
b3b6e05e 203 ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
f67539c2
TL
204 const auto now = ceph::coarse_real_time::clock::now();
205 const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
206 librados::ObjectWriteOperation op;
207 op.assert_exists();
208 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
209 ClsLockType::EXCLUSIVE,
210 lock_cookie,
211 "" /*no tag*/);
212 cls_2pc_queue_expire_reservations(op, stale_time);
213 // check ownership and do reservation cleanup in one batch
b3b6e05e 214 auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
f67539c2
TL
215 if (ret == -ENOENT) {
216 // queue was deleted
b3b6e05e 217 ldpp_dout(this, 5) << "INFO: queue: "
f67539c2
TL
218 << queue_name << ". was removed. cleanup will stop" << dendl;
219 return;
220 }
221 if (ret == -EBUSY) {
b3b6e05e 222 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
f67539c2
TL
223 return;
224 }
225 if (ret < 0) {
b3b6e05e 226 ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
f67539c2
TL
227 << ". error: " << ret << dendl;
228 }
229 Timer timer(io_context);
230 timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s));
231 boost::system::error_code ec;
232 timer.async_wait(yield[ec]);
233 }
234 }
235
236 // processing of a specific queue
20effc67 237 void process_queue(const std::string& queue_name, yield_context yield) {
f67539c2
TL
238 constexpr auto max_elements = 1024;
239 auto is_idle = false;
240 const std::string start_marker;
241
242 // start a the cleanup coroutine for the queue
20effc67 243 spawn::spawn(io_context, [this, queue_name](yield_context yield) {
f67539c2
TL
244 cleanup_queue(queue_name, yield);
245 }, make_stack_allocator());
246
247 while (true) {
248 // if queue was empty the last time, sleep for idle timeout
249 if (is_idle) {
250 Timer timer(io_context);
251 timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us));
252 boost::system::error_code ec;
253 timer.async_wait(yield[ec]);
254 }
255
256 // get list of entries in the queue
257 is_idle = true;
258 bool truncated = false;
259 std::string end_marker;
260 std::vector<cls_queue_entry> entries;
261 auto total_entries = 0U;
262 {
263 librados::ObjectReadOperation op;
264 op.assert_exists();
265 bufferlist obl;
266 int rval;
267 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
268 ClsLockType::EXCLUSIVE,
269 lock_cookie,
270 "" /*no tag*/);
271 cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
272 // check ownership and list entries in one batch
b3b6e05e 273 auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
f67539c2
TL
274 if (ret == -ENOENT) {
275 // queue was deleted
b3b6e05e 276 ldpp_dout(this, 5) << "INFO: queue: "
f67539c2
TL
277 << queue_name << ". was removed. processing will stop" << dendl;
278 return;
279 }
280 if (ret == -EBUSY) {
b3b6e05e 281 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
f67539c2
TL
282 return;
283 }
284 if (ret < 0) {
b3b6e05e 285 ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
f67539c2
TL
286 << queue_name << ". error: " << ret << " (will retry)" << dendl;
287 continue;
288 }
289 ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
290 if (ret < 0) {
b3b6e05e 291 ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
f67539c2
TL
292 << queue_name << ". error: " << ret << " (will retry)" << dendl;
293 continue;
294 }
295 }
296 total_entries = entries.size();
297 if (total_entries == 0) {
298 // nothing in the queue
299 continue;
300 }
301 // log when queue is not idle
b3b6e05e 302 ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
f67539c2
TL
303 ". end marker is: " << end_marker << dendl;
304
305 is_idle = false;
306 auto has_error = false;
307 auto remove_entries = false;
308 auto entry_idx = 1U;
309 tokens_waiter waiter(io_context);
310 for (auto& entry : entries) {
311 if (has_error) {
312 // bail out on first error
313 break;
314 }
315 // TODO pass entry pointer instead of by-value
20effc67 316 spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
f67539c2
TL
317 const auto token = waiter.make_token();
318 if (process_entry(entry, yield)) {
b3b6e05e 319 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
f67539c2
TL
320 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
321 remove_entries = true;
322 } else {
323 if (set_min_marker(end_marker, entry.marker) < 0) {
b3b6e05e 324 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
f67539c2 325 } else {
b3b6e05e 326 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
f67539c2
TL
327 }
328 has_error = true;
b3b6e05e 329 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
f67539c2
TL
330 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
331 }
332 }, make_stack_allocator());
333 ++entry_idx;
334 }
335
336 // wait for all pending work to finish
337 waiter.async_wait(yield);
338
339 // delete all published entries from queue
340 if (remove_entries) {
341 librados::ObjectWriteOperation op;
342 op.assert_exists();
343 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
344 ClsLockType::EXCLUSIVE,
345 lock_cookie,
346 "" /*no tag*/);
347 cls_2pc_queue_remove_entries(op, end_marker);
348 // check ownership and deleted entries in one batch
b3b6e05e 349 const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
f67539c2
TL
350 if (ret == -ENOENT) {
351 // queue was deleted
b3b6e05e 352 ldpp_dout(this, 5) << "INFO: queue: "
f67539c2
TL
353 << queue_name << ". was removed. processing will stop" << dendl;
354 return;
355 }
356 if (ret == -EBUSY) {
b3b6e05e 357 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
f67539c2
TL
358 return;
359 }
360 if (ret < 0) {
b3b6e05e 361 ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: "
f67539c2
TL
362 << queue_name << ". error: " << ret << dendl;
363 } else {
b3b6e05e 364 ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: "
f67539c2
TL
365 << queue_name << dendl;
366 }
367 }
f67539c2
TL
368 }
369 }
370
371 // lits of owned queues
372 using owned_queues_t = std::unordered_set<std::string>;
373
374 // process all queues
375 // find which of the queues is owned by this daemon and process it
20effc67 376 void process_queues(yield_context yield) {
f67539c2
TL
377 auto has_error = false;
378 owned_queues_t owned_queues;
379
380 // add randomness to the duration between queue checking
381 // to make sure that different daemons are not synced
382 std::random_device seed;
383 std::mt19937 rnd_gen(seed());
384 const auto min_jitter = 100; // ms
385 const auto max_jitter = 500; // ms
386 std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
387
522d829b
TL
388 std::vector<std::string> queue_gc;
389 std::mutex queue_gc_lock;
f67539c2
TL
390 while (true) {
391 Timer timer(io_context);
392 const auto duration = (has_error ?
393 std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
394 std::chrono::milliseconds(duration_jitter(rnd_gen));
395 timer.expires_from_now(duration);
396 const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
b3b6e05e 397 ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl;
f67539c2
TL
398 boost::system::error_code ec;
399 timer.async_wait(yield[ec]);
400
401 queues_t queues;
402 auto ret = read_queue_list(queues, optional_yield(io_context, yield));
403 if (ret < 0) {
404 has_error = true;
405 continue;
406 }
407
f67539c2
TL
408 for (const auto& queue_name : queues) {
409 // try to lock the queue to check if it is owned by this rgw
410 // or if ownershif needs to be taken
411 librados::ObjectWriteOperation op;
412 op.assert_exists();
413 rados::cls::lock::lock(&op, queue_name+"_lock",
414 ClsLockType::EXCLUSIVE,
415 lock_cookie,
416 "" /*no tag*/,
417 "" /*no description*/,
418 failover_time,
419 LOCK_FLAG_MAY_RENEW);
420
b3b6e05e 421 ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
f67539c2
TL
422 if (ret == -EBUSY) {
423 // lock is already taken by another RGW
b3b6e05e 424 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
f67539c2
TL
425 // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
426 continue;
427 }
428 if (ret == -ENOENT) {
429 // queue is deleted - processing will stop the next time we try to read from the queue
b3b6e05e 430 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
f67539c2
TL
431 continue;
432 }
433 if (ret < 0) {
434 // failed to lock for another reason, continue to process other queues
b3b6e05e 435 ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
f67539c2
TL
436 has_error = true;
437 continue;
438 }
439 // add queue to list of owned queues
440 if (owned_queues.insert(queue_name).second) {
b3b6e05e 441 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
f67539c2 442 // start processing this queue
20effc67 443 spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) {
f67539c2
TL
444 process_queue(queue_name, yield);
445 // if queue processing ended, it measn that the queue was removed or not owned anymore
446 // mark it for deletion
447 std::lock_guard lock_guard(queue_gc_lock);
448 queue_gc.push_back(queue_name);
b3b6e05e 449 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
f67539c2
TL
450 }, make_stack_allocator());
451 } else {
b3b6e05e 452 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
f67539c2
TL
453 }
454 }
455 // erase all queue that were deleted
456 {
457 std::lock_guard lock_guard(queue_gc_lock);
458 std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
459 owned_queues.erase(queue_name);
b3b6e05e 460 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
f67539c2
TL
461 });
462 queue_gc.clear();
463 }
464 }
465 }
466
467public:
468
469 ~Manager() {
470 work_guard.reset();
471 io_context.stop();
472 std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
473 }
474
475 // ctor: start all threads
476 Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
477 uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
478 uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
20effc67 479 uint32_t _worker_count, rgw::sal::RadosStore* store) :
f67539c2
TL
480 max_queue_size(_max_queue_size),
481 queues_update_period_ms(_queues_update_period_ms),
482 queues_update_retry_ms(_queues_update_retry_ms),
483 queue_idle_sleep_us(_queue_idle_sleep_us),
484 failover_time(std::chrono::milliseconds(failover_time_ms)),
485 cct(_cct),
f67539c2
TL
486 lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
487 work_guard(boost::asio::make_work_guard(io_context)),
488 worker_count(_worker_count),
489 stale_reservations_period_s(_stale_reservations_period_s),
aee94f69
TL
490 reservations_cleanup_period_s(_reservations_cleanup_period_s),
491 rados_ioctx(store->getRados()->get_notif_pool_ctx())
f67539c2 492 {
20effc67 493 spawn::spawn(io_context, [this] (yield_context yield) {
f67539c2
TL
494 process_queues(yield);
495 }, make_stack_allocator());
496
497 // start the worker threads to do the actual queue processing
498 const std::string WORKER_THREAD_NAME = "notif-worker";
499 for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
522d829b
TL
500 workers.emplace_back([this]() {
501 try {
502 io_context.run();
503 } catch (const std::exception& err) {
504 ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
505 throw(err);
506 }
507 });
20effc67 508 const auto rc = ceph_pthread_setname(workers.back().native_handle(),
522d829b 509 (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
f67539c2
TL
510 ceph_assert(rc == 0);
511 }
b3b6e05e 512 ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
f67539c2
TL
513 }
514
515 int add_persistent_topic(const std::string& topic_name, optional_yield y) {
516 if (topic_name == Q_LIST_OBJECT_NAME) {
b3b6e05e 517 ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
f67539c2
TL
518 return -EINVAL;
519 }
520 librados::ObjectWriteOperation op;
521 op.create(true);
522 cls_2pc_queue_init(op, topic_name, max_queue_size);
b3b6e05e 523 auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
f67539c2
TL
524 if (ret == -EEXIST) {
525 // queue already exists - nothing to do
b3b6e05e 526 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
f67539c2
TL
527 return 0;
528 }
529 if (ret < 0) {
530 // failed to create queue
b3b6e05e 531 ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
f67539c2
TL
532 return ret;
533 }
534
535 bufferlist empty_bl;
536 std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
537 op.omap_set(new_topic);
b3b6e05e 538 ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
f67539c2 539 if (ret < 0) {
b3b6e05e 540 ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
f67539c2
TL
541 return ret;
542 }
b3b6e05e 543 ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
f67539c2
TL
544 return 0;
545 }
f67539c2
TL
546};
547
548// singleton manager
549// note that the manager itself is not a singleton, and multiple instances may co-exist
550// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
551static Manager* s_manager = nullptr;
552
553constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
554constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
555constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed
556constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms
557constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
558constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
559constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
560constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
561
20effc67 562bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) {
f67539c2
TL
563 if (s_manager) {
564 return false;
565 }
566 // TODO: take conf from CephContext
567 s_manager = new Manager(cct, MAX_QUEUE_SIZE,
568 Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
569 IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
570 STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
571 WORKER_COUNT,
572 store);
573 return true;
574}
575
576void shutdown() {
577 delete s_manager;
578 s_manager = nullptr;
579}
580
581int add_persistent_topic(const std::string& topic_name, optional_yield y) {
582 if (!s_manager) {
583 return -EAGAIN;
584 }
585 return s_manager->add_persistent_topic(topic_name, y);
586}
587
aee94f69
TL
588int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) {
589 librados::ObjectWriteOperation op;
590 op.remove();
591 auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y);
592 if (ret == -ENOENT) {
593 // queue already removed - nothing to do
594 ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
595 return 0;
596 }
597 if (ret < 0) {
598 // failed to remove queue
599 ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
600 return ret;
601 }
602
603 std::set<std::string> topic_to_remove{{topic_name}};
604 op.omap_rm_keys(topic_to_remove);
605 ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
606 if (ret < 0) {
607 ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
608 return ret;
609 }
610 ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
611 return 0;
612}
613
f67539c2
TL
614int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
615 if (!s_manager) {
616 return -EAGAIN;
617 }
aee94f69 618 return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y);
f67539c2
TL
619}
620
20effc67
TL
621rgw::sal::Object* get_object_with_atttributes(
622 const reservation_t& res, rgw::sal::Object* obj) {
f67539c2 623 // in case of copy obj, the tags and metadata are taken from source
20effc67 624 const auto src_obj = res.src_object ? res.src_object : obj;
f67539c2
TL
625 if (src_obj->get_attrs().empty()) {
626 if (!src_obj->get_bucket()) {
20effc67 627 src_obj->set_bucket(res.bucket);
f67539c2 628 }
1e59de90
TL
629 const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp);
630 if (ret < 0) {
631 ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " <<
632 src_obj->get_key() << ". ret = " << ret << dendl;
f67539c2
TL
633 return nullptr;
634 }
635 }
636 return src_obj;
637}
638
aee94f69
TL
639static inline void filter_amz_meta(meta_map_t& dest, const meta_map_t& src) {
640 std::copy_if(src.cbegin(), src.cend(),
641 std::inserter(dest, dest.end()),
642 [](const auto& m) {
643 return (boost::algorithm::starts_with(m.first, RGW_AMZ_META_PREFIX));
644 });
645}
646
647
20effc67
TL
648static inline void metadata_from_attributes(
649 reservation_t& res, rgw::sal::Object* obj) {
650 auto& metadata = res.x_meta_map;
651 const auto src_obj = get_object_with_atttributes(res, obj);
f67539c2
TL
652 if (!src_obj) {
653 return;
654 }
1e59de90 655 res.metadata_fetched_from_attributes = true;
f67539c2
TL
656 for (auto& attr : src_obj->get_attrs()) {
657 if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
658 std::string_view key(attr.first);
659 key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
660 // we want to pass a null terminated version
661 // of the bufferlist, hence "to_str().c_str()"
662 metadata.emplace(key, attr.second.to_str().c_str());
663 }
664 }
665}
666
20effc67
TL
667static inline void tags_from_attributes(
668 const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
669 const auto src_obj = get_object_with_atttributes(res, obj);
f67539c2
TL
670 if (!src_obj) {
671 return;
672 }
673 const auto& attrs = src_obj->get_attrs();
674 const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
675 if (attr_iter != attrs.end()) {
676 auto bliter = attr_iter->second.cbegin();
677 RGWObjTags obj_tags;
678 try {
679 ::decode(obj_tags, bliter);
680 } catch(buffer::error&) {
681 // not able to decode tags
682 return;
683 }
684 tags = std::move(obj_tags.get_tags());
685 }
686}
687
688// populate event from request
20effc67
TL
689static inline void populate_event(reservation_t& res,
690 rgw::sal::Object* obj,
9f95a23c 691 uint64_t size,
eafe8130
TL
692 const ceph::real_time& mtime,
693 const std::string& etag,
20effc67 694 const std::string& version,
eafe8130 695 EventType event_type,
522d829b 696 rgw_pubsub_s3_event& event) {
f67539c2 697 event.eventTime = mtime;
a4b75251 698 event.eventName = to_event_string(event_type);
20effc67
TL
699 event.userIdentity = res.user_id; // user that triggered the change
700 event.x_amz_request_id = res.req_id; // request ID of the original change
701 event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
9f95a23c 702 // configurationId is filled from notification configuration
20effc67 703 event.bucket_name = res.bucket->get_name();
aee94f69
TL
704 event.bucket_ownerIdentity = res.bucket->get_owner() ?
705 res.bucket->get_owner()->get_id().id : res.bucket->get_info().owner.id;
1e59de90
TL
706 const auto region = res.store->get_zone()->get_zonegroup().get_api_name();
707 rgw::ARN bucket_arn(res.bucket->get_key());
708 bucket_arn.region = region;
709 event.bucket_arn = to_string(bucket_arn);
522d829b 710 event.object_key = res.object_name ? *res.object_name : obj->get_name();
f67539c2
TL
711 event.object_size = size;
712 event.object_etag = etag;
20effc67 713 event.object_versionId = version;
1e59de90 714 event.awsRegion = region;
eafe8130
TL
715 // use timestamp as per key sequence id (hex encoded)
716 const utime_t ts(real_clock::now());
717 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
f67539c2
TL
718 std::back_inserter(event.object_sequencer));
719 set_event_id(event.id, etag, ts);
20effc67
TL
720 event.bucket_id = res.bucket->get_bucket_id();
721 // pass meta data
1e59de90 722 if (!res.metadata_fetched_from_attributes) {
522d829b 723 // either no metadata exist or no metadata filter was used
20effc67 724 metadata_from_attributes(res, obj);
f67539c2 725 }
1e59de90 726 event.x_meta_map = res.x_meta_map;
9f95a23c 727 // pass tags
20effc67
TL
728 if (!res.tagset ||
729 (*res.tagset).get_tags().empty()) {
f67539c2 730 // try to fetch the tags from the attributes
20effc67 731 tags_from_attributes(res, obj, event.tags);
f67539c2 732 } else {
20effc67 733 event.tags = (*res.tagset).get_tags();
f67539c2 734 }
9f95a23c 735 // opaque data will be filled from topic configuration
eafe8130
TL
736}
737
20effc67
TL
738static inline bool notification_match(reservation_t& res,
739 const rgw_pubsub_topic_filter& filter,
740 EventType event,
741 const RGWObjTags* req_tags) {
742 if (!match(filter.events, event)) {
eafe8130
TL
743 return false;
744 }
522d829b
TL
745 const auto obj = res.object;
746 if (!match(filter.s3_filter.key_filter,
747 res.object_name ? *res.object_name : obj->get_name())) {
eafe8130
TL
748 return false;
749 }
f67539c2
TL
750
751 if (!filter.s3_filter.metadata_filter.kv.empty()) {
752 // metadata filter exists
20effc67 753 if (res.s) {
aee94f69 754 filter_amz_meta(res.x_meta_map, res.s->info.x_meta_map);
20effc67
TL
755 }
756 metadata_from_attributes(res, obj);
757 if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
522d829b 758 return false;
f67539c2 759 }
eafe8130 760 }
f67539c2
TL
761
762 if (!filter.s3_filter.tag_filter.kv.empty()) {
763 // tag filter exists
764 if (req_tags) {
765 // tags in the request
766 if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
767 return false;
768 }
20effc67 769 } else if (res.tagset && !(*res.tagset).get_tags().empty()) {
f67539c2 770 // tags were cached in req_state
20effc67 771 if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
f67539c2
TL
772 return false;
773 }
774 } else {
775 // try to fetch tags from the attributes
20effc67
TL
776 KeyMultiValueMap tags;
777 tags_from_attributes(res, obj, tags);
f67539c2
TL
778 if (!match(filter.s3_filter.tag_filter, tags)) {
779 return false;
780 }
781 }
9f95a23c 782 }
f67539c2 783
eafe8130
TL
784 return true;
785}
786
20effc67
TL
787 int publish_reserve(const DoutPrefixProvider* dpp,
788 EventType event_type,
789 reservation_t& res,
790 const RGWObjTags* req_tags)
f67539c2 791{
1e59de90
TL
792 const RGWPubSub ps(res.store, res.user_tenant);
793 const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
f67539c2 794 rgw_pubsub_bucket_topics bucket_topics;
1e59de90 795 auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
f67539c2
TL
796 if (rc < 0) {
797 // failed to fetch bucket topics
798 return rc;
799 }
800 for (const auto& bucket_topic : bucket_topics.topics) {
801 const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
802 const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
522d829b 803 if (!notification_match(res, topic_filter, event_type, req_tags)) {
f67539c2
TL
804 // notification does not apply to req_state
805 continue;
806 }
20effc67 807 ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
f67539c2 808 "' on topic: '" << topic_cfg.dest.arn_topic <<
20effc67 809 "' and bucket: '" << res.bucket->get_name() <<
f67539c2
TL
810 "' (unique topic: '" << topic_cfg.name <<
811 "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
812
813 cls_2pc_reservation::id_t res_id;
814 if (topic_cfg.dest.persistent) {
815 // TODO: take default reservation size from conf
816 constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K
817 res.size = DEFAULT_RESERVATION;
818 librados::ObjectWriteOperation op;
819 bufferlist obl;
820 int rval;
821 const auto& queue_name = topic_cfg.dest.arn_topic;
822 cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
20effc67
TL
823 auto ret = rgw_rados_operate(
824 res.dpp, res.store->getRados()->get_notif_pool_ctx(),
825 queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
f67539c2 826 if (ret < 0) {
20effc67
TL
827 ldpp_dout(res.dpp, 1) <<
828 "ERROR: failed to reserve notification on queue: "
829 << queue_name << ". error: " << ret << dendl;
f67539c2
TL
830 // if no space is left in queue we ask client to slow down
831 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
832 }
833 ret = cls_2pc_queue_reserve_result(obl, res_id);
834 if (ret < 0) {
20effc67 835 ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
f67539c2
TL
836 return ret;
837 }
838 }
839 res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
840 }
841 return 0;
842}
843
20effc67
TL
844int publish_commit(rgw::sal::Object* obj,
845 uint64_t size,
846 const ceph::real_time& mtime,
847 const std::string& etag,
848 const std::string& version,
849 EventType event_type,
850 reservation_t& res,
851 const DoutPrefixProvider* dpp)
f67539c2
TL
852{
853 for (auto& topic : res.topics) {
20effc67
TL
854 if (topic.cfg.dest.persistent &&
855 topic.res_id == cls_2pc_reservation::NO_ID) {
f67539c2
TL
856 // nothing to commit or already committed/aborted
857 continue;
858 }
859 event_entry_t event_entry;
20effc67 860 populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
f67539c2
TL
861 event_entry.event.configurationId = topic.configurationId;
862 event_entry.event.opaque_data = topic.cfg.opaque_data;
863 if (topic.cfg.dest.persistent) {
864 event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
20effc67
TL
865 event_entry.push_endpoint_args =
866 std::move(topic.cfg.dest.push_endpoint_args);
867 event_entry.arn_topic = topic.cfg.dest.arn_topic;
f67539c2
TL
868 bufferlist bl;
869 encode(event_entry, bl);
870 const auto& queue_name = topic.cfg.dest.arn_topic;
871 if (bl.length() > res.size) {
872 // try to make a larger reservation, fail only if this is not possible
20effc67
TL
873 ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
874 << " exceeded reserved size: " << res.size
875 <<
876 " . trying to make a larger reservation on queue:" << queue_name
877 << dendl;
f67539c2
TL
878 // first cancel the existing reservation
879 librados::ObjectWriteOperation op;
880 cls_2pc_queue_abort(op, topic.res_id);
20effc67
TL
881 auto ret = rgw_rados_operate(
882 dpp, res.store->getRados()->get_notif_pool_ctx(),
883 topic.cfg.dest.arn_topic, &op,
884 res.yield);
f67539c2 885 if (ret < 0) {
20effc67
TL
886 ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
887 << topic.res_id <<
f67539c2 888 " when trying to make a larger reservation on queue: " << queue_name
20effc67 889 << ". error: " << ret << dendl;
f67539c2
TL
890 return ret;
891 }
892 // now try to make a bigger one
20effc67 893 buffer::list obl;
f67539c2
TL
894 int rval;
895 cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
20effc67
TL
896 ret = rgw_rados_operate(
897 dpp, res.store->getRados()->get_notif_pool_ctx(),
898 queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
f67539c2 899 if (ret < 0) {
20effc67
TL
900 ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
901 << queue_name
902 << ". error: " << ret << dendl;
f67539c2
TL
903 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
904 }
905 ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
906 if (ret < 0) {
20effc67
TL
907 ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
908 "extra space. error: " << ret << dendl;
f67539c2
TL
909 return ret;
910 }
911 }
20effc67 912 std::vector<buffer::list> bl_data_vec{std::move(bl)};
f67539c2
TL
913 librados::ObjectWriteOperation op;
914 cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
20effc67
TL
915 const auto ret = rgw_rados_operate(
916 dpp, res.store->getRados()->get_notif_pool_ctx(),
917 queue_name, &op, res.yield);
f67539c2
TL
918 topic.res_id = cls_2pc_reservation::NO_ID;
919 if (ret < 0) {
20effc67
TL
920 ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
921 << queue_name << ". error: " << ret
922 << dendl;
f67539c2
TL
923 return ret;
924 }
925 } else {
926 try {
927 // TODO add endpoint LRU cache
20effc67
TL
928 const auto push_endpoint = RGWPubSubEndpoint::create(
929 topic.cfg.dest.push_endpoint,
930 topic.cfg.dest.arn_topic,
931 RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
932 dpp->get_cct());
933 ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
934 << topic.cfg.dest.push_endpoint << dendl;
935 const auto ret = push_endpoint->send_to_completion_async(
936 dpp->get_cct(), event_entry.event, res.yield);
f67539c2 937 if (ret < 0) {
20effc67
TL
938 ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
939 << topic.cfg.dest.push_endpoint
940 << " failed. error: " << ret << dendl;
f67539c2
TL
941 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
942 return ret;
eafe8130 943 }
f67539c2
TL
944 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
945 } catch (const RGWPubSubEndpoint::configuration_error& e) {
b3b6e05e 946 ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
f67539c2
TL
947 << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
948 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
949 return -EINVAL;
950 }
eafe8130 951 }
f67539c2
TL
952 }
953 return 0;
954}
eafe8130 955
1e59de90 956int publish_abort(reservation_t& res) {
f67539c2 957 for (auto& topic : res.topics) {
20effc67
TL
958 if (!topic.cfg.dest.persistent ||
959 topic.res_id == cls_2pc_reservation::NO_ID) {
f67539c2
TL
960 // nothing to abort or already committed/aborted
961 continue;
962 }
963 const auto& queue_name = topic.cfg.dest.arn_topic;
964 librados::ObjectWriteOperation op;
965 cls_2pc_queue_abort(op, topic.res_id);
20effc67
TL
966 const auto ret = rgw_rados_operate(
967 res.dpp, res.store->getRados()->get_notif_pool_ctx(),
968 queue_name, &op, res.yield);
f67539c2 969 if (ret < 0) {
20effc67
TL
970 ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
971 << topic.res_id <<
f67539c2
TL
972 " from queue: " << queue_name << ". error: " << ret << dendl;
973 return ret;
974 }
975 topic.res_id = cls_2pc_reservation::NO_ID;
976 }
977 return 0;
978}
979
20effc67
TL
980reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
981 rgw::sal::RadosStore* _store,
1e59de90 982 const req_state* _s,
20effc67
TL
983 rgw::sal::Object* _object,
984 rgw::sal::Object* _src_object,
1e59de90
TL
985 const std::string* _object_name,
986 optional_yield y) :
987 dpp(_s), store(_store), s(_s), size(0) /* XXX */,
20effc67
TL
988 object(_object), src_object(_src_object), bucket(_s->bucket.get()),
989 object_name(_object_name),
990 tagset(_s->tagset),
1e59de90 991 metadata_fetched_from_attributes(false),
20effc67
TL
992 user_id(_s->user->get_id().id),
993 user_tenant(_s->user->get_id().tenant),
994 req_id(_s->req_id),
1e59de90 995 yield(y)
aee94f69
TL
996{
997 filter_amz_meta(x_meta_map, _s->info.x_meta_map);
998}
20effc67
TL
999
1000reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
1001 rgw::sal::RadosStore* _store,
20effc67
TL
1002 rgw::sal::Object* _object,
1003 rgw::sal::Object* _src_object,
1004 rgw::sal::Bucket* _bucket,
1e59de90
TL
1005 const std::string& _user_id,
1006 const std::string& _user_tenant,
1007 const std::string& _req_id,
20effc67
TL
1008 optional_yield y) :
1009 dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
20effc67
TL
1010 object(_object), src_object(_src_object), bucket(_bucket),
1011 object_name(nullptr),
1e59de90 1012 metadata_fetched_from_attributes(false),
20effc67
TL
1013 user_id(_user_id),
1014 user_tenant(_user_tenant),
1015 req_id(_req_id),
1016 yield(y)
1017{}
eafe8130 1018
20effc67
TL
1019reservation_t::~reservation_t() {
1020 publish_abort(*this);
eafe8130
TL
1021}
1022
20effc67 1023} // namespace rgw::notify