]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_notify.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_notify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_notify.h"
5 #include "cls/2pc_queue/cls_2pc_queue_client.h"
6 #include "cls/lock/cls_lock_client.h"
7 #include <memory>
8 #include <boost/algorithm/hex.hpp>
9 #include <boost/context/protected_fixedsize_stack.hpp>
10 #include <spawn/spawn.hpp>
11 #include "rgw_sal_rados.h"
12 #include "rgw_pubsub.h"
13 #include "rgw_pubsub_push.h"
14 #include "rgw_perf_counters.h"
15 #include "common/dout.h"
16 #include <chrono>
17
18 #define dout_subsys ceph_subsys_rgw
19
20 namespace rgw::notify {
21
22 struct event_entry_t {
23 rgw_pubsub_s3_event event;
24 std::string push_endpoint;
25 std::string push_endpoint_args;
26 std::string arn_topic;
27
28 void encode(bufferlist& bl) const {
29 ENCODE_START(1, 1, bl);
30 encode(event, bl);
31 encode(push_endpoint, bl);
32 encode(push_endpoint_args, bl);
33 encode(arn_topic, bl);
34 ENCODE_FINISH(bl);
35 }
36
37 void decode(bufferlist::const_iterator& bl) {
38 DECODE_START(1, bl);
39 decode(event, bl);
40 decode(push_endpoint, bl);
41 decode(push_endpoint_args, bl);
42 decode(arn_topic, bl);
43 DECODE_FINISH(bl);
44 }
45 };
46 WRITE_CLASS_ENCODER(event_entry_t)
47
48 using queues_t = std::set<std::string>;
49
50 // use mmap/mprotect to allocate 128k coroutine stacks
51 auto make_stack_allocator() {
52 return boost::context::protected_fixedsize_stack{128*1024};
53 }
54
55 class Manager : public DoutPrefixProvider {
56 const size_t max_queue_size;
57 const uint32_t queues_update_period_ms;
58 const uint32_t queues_update_retry_ms;
59 const uint32_t queue_idle_sleep_us;
60 const utime_t failover_time;
61 CephContext* const cct;
62 librados::IoCtx& rados_ioctx;
63 static constexpr auto COOKIE_LEN = 16;
64 const std::string lock_cookie;
65 boost::asio::io_context io_context;
66 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
67 const uint32_t worker_count;
68 std::vector<std::thread> workers;
69 const uint32_t stale_reservations_period_s;
70 const uint32_t reservations_cleanup_period_s;
71
72 const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
73
74 CephContext *get_cct() const override { return cct; }
75 unsigned get_subsys() const override { return dout_subsys; }
76 std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }
77
78 // read the list of queues from the queue list object
79 int read_queue_list(queues_t& queues, optional_yield y) {
80 constexpr auto max_chunk = 1024U;
81 std::string start_after;
82 bool more = true;
83 int rval;
84 while (more) {
85 librados::ObjectReadOperation op;
86 queues_t queues_chunk;
87 op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
88 const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
89 if (ret == -ENOENT) {
90 // queue list object was not created - nothing to do
91 return 0;
92 }
93 if (ret < 0) {
94 // TODO: do we need to check on rval as well as ret?
95 ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
96 return ret;
97 }
98 queues.merge(queues_chunk);
99 }
100 return 0;
101 }
102
103 // set m1 to be the minimum between m1 and m2
104 static int set_min_marker(std::string& m1, const std::string m2) {
105 cls_queue_marker mr1;
106 cls_queue_marker mr2;
107 if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) {
108 return -EINVAL;
109 }
110 if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) {
111 m1 = m2;
112 }
113 return 0;
114 }
115
116 using Clock = ceph::coarse_mono_clock;
117 using Executor = boost::asio::io_context::executor_type;
118 using Timer = boost::asio::basic_waitable_timer<Clock,
119 boost::asio::wait_traits<Clock>, Executor>;
120
121 class tokens_waiter {
122 const std::chrono::hours infinite_duration;
123 size_t pending_tokens;
124 Timer timer;
125
126 struct token {
127 tokens_waiter& waiter;
128 token(tokens_waiter& _waiter) : waiter(_waiter) {
129 ++waiter.pending_tokens;
130 }
131
132 ~token() {
133 --waiter.pending_tokens;
134 if (waiter.pending_tokens == 0) {
135 waiter.timer.cancel();
136 }
137 }
138 };
139
140 public:
141
142 tokens_waiter(boost::asio::io_context& io_context) :
143 infinite_duration(1000),
144 pending_tokens(0),
145 timer(io_context) {}
146
147 void async_wait(yield_context yield) {
148 if (pending_tokens == 0) {
149 return;
150 }
151 timer.expires_from_now(infinite_duration);
152 boost::system::error_code ec;
153 timer.async_wait(yield[ec]);
154 ceph_assert(ec == boost::system::errc::operation_canceled);
155 }
156
157 token make_token() {
158 return token(*this);
159 }
160 };
161
162 // processing of a specific entry
163 // return whether processing was successfull (true) or not (false)
164 bool process_entry(const cls_queue_entry& entry, yield_context yield) {
165 event_entry_t event_entry;
166 auto iter = entry.data.cbegin();
167 try {
168 decode(event_entry, iter);
169 } catch (buffer::error& err) {
170 ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
171 return false;
172 }
173 try {
174 // TODO move endpoint creation to queue level
175 const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
176 RGWHTTPArgs(event_entry.push_endpoint_args, this),
177 cct);
178 ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
179 " for entry: " << entry.marker << dendl;
180 const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
181 if (ret < 0) {
182 ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
183 << " failed. error: " << ret << " (will retry)" << dendl;
184 return false;
185 } else {
186 ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
187 << " ok" << dendl;
188 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
189 return true;
190 }
191 } catch (const RGWPubSubEndpoint::configuration_error& e) {
192 ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
193 << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
194 return false;
195 }
196 }
197
198 // clean stale reservation from queue
199 void cleanup_queue(const std::string& queue_name, yield_context yield) {
200 while (true) {
201 ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
202 const auto now = ceph::coarse_real_time::clock::now();
203 const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
204 librados::ObjectWriteOperation op;
205 op.assert_exists();
206 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
207 ClsLockType::EXCLUSIVE,
208 lock_cookie,
209 "" /*no tag*/);
210 cls_2pc_queue_expire_reservations(op, stale_time);
211 // check ownership and do reservation cleanup in one batch
212 auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
213 if (ret == -ENOENT) {
214 // queue was deleted
215 ldpp_dout(this, 5) << "INFO: queue: "
216 << queue_name << ". was removed. cleanup will stop" << dendl;
217 return;
218 }
219 if (ret == -EBUSY) {
220 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
221 return;
222 }
223 if (ret < 0) {
224 ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
225 << ". error: " << ret << dendl;
226 }
227 Timer timer(io_context);
228 timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s));
229 boost::system::error_code ec;
230 timer.async_wait(yield[ec]);
231 }
232 }
233
234 // processing of a specific queue
235 void process_queue(const std::string& queue_name, yield_context yield) {
236 constexpr auto max_elements = 1024;
237 auto is_idle = false;
238 const std::string start_marker;
239
240 // start a the cleanup coroutine for the queue
241 spawn::spawn(io_context, [this, queue_name](yield_context yield) {
242 cleanup_queue(queue_name, yield);
243 }, make_stack_allocator());
244
245 while (true) {
246 // if queue was empty the last time, sleep for idle timeout
247 if (is_idle) {
248 Timer timer(io_context);
249 timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us));
250 boost::system::error_code ec;
251 timer.async_wait(yield[ec]);
252 }
253
254 // get list of entries in the queue
255 is_idle = true;
256 bool truncated = false;
257 std::string end_marker;
258 std::vector<cls_queue_entry> entries;
259 auto total_entries = 0U;
260 {
261 librados::ObjectReadOperation op;
262 op.assert_exists();
263 bufferlist obl;
264 int rval;
265 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
266 ClsLockType::EXCLUSIVE,
267 lock_cookie,
268 "" /*no tag*/);
269 cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
270 // check ownership and list entries in one batch
271 auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
272 if (ret == -ENOENT) {
273 // queue was deleted
274 ldpp_dout(this, 5) << "INFO: queue: "
275 << queue_name << ". was removed. processing will stop" << dendl;
276 return;
277 }
278 if (ret == -EBUSY) {
279 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
280 return;
281 }
282 if (ret < 0) {
283 ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
284 << queue_name << ". error: " << ret << " (will retry)" << dendl;
285 continue;
286 }
287 ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
288 if (ret < 0) {
289 ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
290 << queue_name << ". error: " << ret << " (will retry)" << dendl;
291 continue;
292 }
293 }
294 total_entries = entries.size();
295 if (total_entries == 0) {
296 // nothing in the queue
297 continue;
298 }
299 // log when queue is not idle
300 ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
301 ". end marker is: " << end_marker << dendl;
302
303 is_idle = false;
304 auto has_error = false;
305 auto remove_entries = false;
306 auto entry_idx = 1U;
307 tokens_waiter waiter(io_context);
308 for (auto& entry : entries) {
309 if (has_error) {
310 // bail out on first error
311 break;
312 }
313 // TODO pass entry pointer instead of by-value
314 spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
315 const auto token = waiter.make_token();
316 if (process_entry(entry, yield)) {
317 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
318 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
319 remove_entries = true;
320 } else {
321 if (set_min_marker(end_marker, entry.marker) < 0) {
322 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
323 } else {
324 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
325 }
326 has_error = true;
327 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
328 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
329 }
330 }, make_stack_allocator());
331 ++entry_idx;
332 }
333
334 // wait for all pending work to finish
335 waiter.async_wait(yield);
336
337 // delete all published entries from queue
338 if (remove_entries) {
339 librados::ObjectWriteOperation op;
340 op.assert_exists();
341 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
342 ClsLockType::EXCLUSIVE,
343 lock_cookie,
344 "" /*no tag*/);
345 cls_2pc_queue_remove_entries(op, end_marker);
346 // check ownership and deleted entries in one batch
347 const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
348 if (ret == -ENOENT) {
349 // queue was deleted
350 ldpp_dout(this, 5) << "INFO: queue: "
351 << queue_name << ". was removed. processing will stop" << dendl;
352 return;
353 }
354 if (ret == -EBUSY) {
355 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
356 return;
357 }
358 if (ret < 0) {
359 ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: "
360 << queue_name << ". error: " << ret << dendl;
361 } else {
362 ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: "
363 << queue_name << dendl;
364 }
365 }
366 }
367 }
368
369 // lits of owned queues
370 using owned_queues_t = std::unordered_set<std::string>;
371
372 // process all queues
373 // find which of the queues is owned by this daemon and process it
374 void process_queues(yield_context yield) {
375 auto has_error = false;
376 owned_queues_t owned_queues;
377
378 // add randomness to the duration between queue checking
379 // to make sure that different daemons are not synced
380 std::random_device seed;
381 std::mt19937 rnd_gen(seed());
382 const auto min_jitter = 100; // ms
383 const auto max_jitter = 500; // ms
384 std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
385
386 std::vector<std::string> queue_gc;
387 std::mutex queue_gc_lock;
388 while (true) {
389 Timer timer(io_context);
390 const auto duration = (has_error ?
391 std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
392 std::chrono::milliseconds(duration_jitter(rnd_gen));
393 timer.expires_from_now(duration);
394 const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
395 ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl;
396 boost::system::error_code ec;
397 timer.async_wait(yield[ec]);
398
399 queues_t queues;
400 auto ret = read_queue_list(queues, optional_yield(io_context, yield));
401 if (ret < 0) {
402 has_error = true;
403 continue;
404 }
405
406 for (const auto& queue_name : queues) {
407 // try to lock the queue to check if it is owned by this rgw
408 // or if ownershif needs to be taken
409 librados::ObjectWriteOperation op;
410 op.assert_exists();
411 rados::cls::lock::lock(&op, queue_name+"_lock",
412 ClsLockType::EXCLUSIVE,
413 lock_cookie,
414 "" /*no tag*/,
415 "" /*no description*/,
416 failover_time,
417 LOCK_FLAG_MAY_RENEW);
418
419 ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
420 if (ret == -EBUSY) {
421 // lock is already taken by another RGW
422 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
423 // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
424 continue;
425 }
426 if (ret == -ENOENT) {
427 // queue is deleted - processing will stop the next time we try to read from the queue
428 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
429 continue;
430 }
431 if (ret < 0) {
432 // failed to lock for another reason, continue to process other queues
433 ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
434 has_error = true;
435 continue;
436 }
437 // add queue to list of owned queues
438 if (owned_queues.insert(queue_name).second) {
439 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
440 // start processing this queue
441 spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) {
442 process_queue(queue_name, yield);
443 // if queue processing ended, it measn that the queue was removed or not owned anymore
444 // mark it for deletion
445 std::lock_guard lock_guard(queue_gc_lock);
446 queue_gc.push_back(queue_name);
447 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
448 }, make_stack_allocator());
449 } else {
450 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
451 }
452 }
453 // erase all queue that were deleted
454 {
455 std::lock_guard lock_guard(queue_gc_lock);
456 std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
457 owned_queues.erase(queue_name);
458 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
459 });
460 queue_gc.clear();
461 }
462 }
463 }
464
465 public:
466
467 ~Manager() {
468 work_guard.reset();
469 io_context.stop();
470 std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
471 }
472
473 // ctor: start all threads
474 Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
475 uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
476 uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
477 uint32_t _worker_count, rgw::sal::RadosStore* store) :
478 max_queue_size(_max_queue_size),
479 queues_update_period_ms(_queues_update_period_ms),
480 queues_update_retry_ms(_queues_update_retry_ms),
481 queue_idle_sleep_us(_queue_idle_sleep_us),
482 failover_time(std::chrono::milliseconds(failover_time_ms)),
483 cct(_cct),
484 rados_ioctx(store->getRados()->get_notif_pool_ctx()),
485 lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
486 work_guard(boost::asio::make_work_guard(io_context)),
487 worker_count(_worker_count),
488 stale_reservations_period_s(_stale_reservations_period_s),
489 reservations_cleanup_period_s(_reservations_cleanup_period_s)
490 {
491 spawn::spawn(io_context, [this] (yield_context yield) {
492 process_queues(yield);
493 }, make_stack_allocator());
494
495 // start the worker threads to do the actual queue processing
496 const std::string WORKER_THREAD_NAME = "notif-worker";
497 for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
498 workers.emplace_back([this]() {
499 try {
500 io_context.run();
501 } catch (const std::exception& err) {
502 ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
503 throw(err);
504 }
505 });
506 const auto rc = ceph_pthread_setname(workers.back().native_handle(),
507 (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
508 ceph_assert(rc == 0);
509 }
510 ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
511 }
512
513 int add_persistent_topic(const std::string& topic_name, optional_yield y) {
514 if (topic_name == Q_LIST_OBJECT_NAME) {
515 ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
516 return -EINVAL;
517 }
518 librados::ObjectWriteOperation op;
519 op.create(true);
520 cls_2pc_queue_init(op, topic_name, max_queue_size);
521 auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
522 if (ret == -EEXIST) {
523 // queue already exists - nothing to do
524 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
525 return 0;
526 }
527 if (ret < 0) {
528 // failed to create queue
529 ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
530 return ret;
531 }
532
533 bufferlist empty_bl;
534 std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
535 op.omap_set(new_topic);
536 ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
537 if (ret < 0) {
538 ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
539 return ret;
540 }
541 ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
542 return 0;
543 }
544
545 int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
546 librados::ObjectWriteOperation op;
547 op.remove();
548 auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
549 if (ret == -ENOENT) {
550 // queue already removed - nothing to do
551 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
552 return 0;
553 }
554 if (ret < 0) {
555 // failed to remove queue
556 ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
557 return ret;
558 }
559
560 std::set<std::string> topic_to_remove{{topic_name}};
561 op.omap_rm_keys(topic_to_remove);
562 ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
563 if (ret < 0) {
564 ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
565 return ret;
566 }
567 ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
568 return 0;
569 }
570 };
571
572 // singleton manager
573 // note that the manager itself is not a singleton, and multiple instances may co-exist
574 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
575 static Manager* s_manager = nullptr;
576
577 constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
578 constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
579 constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed
580 constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms
581 constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
582 constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
583 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
584 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
585
586 bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) {
587 if (s_manager) {
588 return false;
589 }
590 // TODO: take conf from CephContext
591 s_manager = new Manager(cct, MAX_QUEUE_SIZE,
592 Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
593 IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
594 STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
595 WORKER_COUNT,
596 store);
597 return true;
598 }
599
600 void shutdown() {
601 delete s_manager;
602 s_manager = nullptr;
603 }
604
605 int add_persistent_topic(const std::string& topic_name, optional_yield y) {
606 if (!s_manager) {
607 return -EAGAIN;
608 }
609 return s_manager->add_persistent_topic(topic_name, y);
610 }
611
612 int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
613 if (!s_manager) {
614 return -EAGAIN;
615 }
616 return s_manager->remove_persistent_topic(topic_name, y);
617 }
618
619 rgw::sal::Object* get_object_with_atttributes(
620 const reservation_t& res, rgw::sal::Object* obj) {
621 // in case of copy obj, the tags and metadata are taken from source
622 const auto src_obj = res.src_object ? res.src_object : obj;
623 if (src_obj->get_attrs().empty()) {
624 if (!src_obj->get_bucket()) {
625 src_obj->set_bucket(res.bucket);
626 }
627 if (src_obj->get_obj_attrs(res.obj_ctx, res.yield, res.dpp) < 0) {
628 return nullptr;
629 }
630 }
631 return src_obj;
632 }
633
634 static inline void metadata_from_attributes(
635 reservation_t& res, rgw::sal::Object* obj) {
636 auto& metadata = res.x_meta_map;
637 const auto src_obj = get_object_with_atttributes(res, obj);
638 if (!src_obj) {
639 return;
640 }
641 for (auto& attr : src_obj->get_attrs()) {
642 if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
643 std::string_view key(attr.first);
644 key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
645 // we want to pass a null terminated version
646 // of the bufferlist, hence "to_str().c_str()"
647 metadata.emplace(key, attr.second.to_str().c_str());
648 }
649 }
650 }
651
652 static inline void tags_from_attributes(
653 const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
654 const auto src_obj = get_object_with_atttributes(res, obj);
655 if (!src_obj) {
656 return;
657 }
658 const auto& attrs = src_obj->get_attrs();
659 const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
660 if (attr_iter != attrs.end()) {
661 auto bliter = attr_iter->second.cbegin();
662 RGWObjTags obj_tags;
663 try {
664 ::decode(obj_tags, bliter);
665 } catch(buffer::error&) {
666 // not able to decode tags
667 return;
668 }
669 tags = std::move(obj_tags.get_tags());
670 }
671 }
672
673 // populate event from request
674 static inline void populate_event(reservation_t& res,
675 rgw::sal::Object* obj,
676 uint64_t size,
677 const ceph::real_time& mtime,
678 const std::string& etag,
679 const std::string& version,
680 EventType event_type,
681 rgw_pubsub_s3_event& event) {
682 event.eventTime = mtime;
683 event.eventName = to_event_string(event_type);
684 event.userIdentity = res.user_id; // user that triggered the change
685 event.x_amz_request_id = res.req_id; // request ID of the original change
686 event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
687 // configurationId is filled from notification configuration
688 event.bucket_name = res.bucket->get_name();
689 event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id;
690 event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key()));
691 event.object_key = res.object_name ? *res.object_name : obj->get_name();
692 event.object_size = size;
693 event.object_etag = etag;
694 event.object_versionId = version;
695 event.awsRegion = res.store->get_zone()->get_zonegroup().api_name;
696 // use timestamp as per key sequence id (hex encoded)
697 const utime_t ts(real_clock::now());
698 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
699 std::back_inserter(event.object_sequencer));
700 set_event_id(event.id, etag, ts);
701 event.bucket_id = res.bucket->get_bucket_id();
702 // pass meta data
703 if (res.x_meta_map.empty()) {
704 // no metadata cached:
705 // either no metadata exist or no metadata filter was used
706 metadata_from_attributes(res, obj);
707 } else {
708 event.x_meta_map = res.x_meta_map;
709 }
710 // pass tags
711 if (!res.tagset ||
712 (*res.tagset).get_tags().empty()) {
713 // try to fetch the tags from the attributes
714 tags_from_attributes(res, obj, event.tags);
715 } else {
716 event.tags = (*res.tagset).get_tags();
717 }
718 // opaque data will be filled from topic configuration
719 }
720
721 static inline bool notification_match(reservation_t& res,
722 const rgw_pubsub_topic_filter& filter,
723 EventType event,
724 const RGWObjTags* req_tags) {
725 if (!match(filter.events, event)) {
726 return false;
727 }
728 const auto obj = res.object;
729 if (!match(filter.s3_filter.key_filter,
730 res.object_name ? *res.object_name : obj->get_name())) {
731 return false;
732 }
733
734 if (!filter.s3_filter.metadata_filter.kv.empty()) {
735 // metadata filter exists
736 if (res.s) {
737 res.x_meta_map = res.s->info.x_meta_map;
738 }
739 metadata_from_attributes(res, obj);
740 if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
741 return false;
742 }
743 }
744
745 if (!filter.s3_filter.tag_filter.kv.empty()) {
746 // tag filter exists
747 if (req_tags) {
748 // tags in the request
749 if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
750 return false;
751 }
752 } else if (res.tagset && !(*res.tagset).get_tags().empty()) {
753 // tags were cached in req_state
754 if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
755 return false;
756 }
757 } else {
758 // try to fetch tags from the attributes
759 KeyMultiValueMap tags;
760 tags_from_attributes(res, obj, tags);
761 if (!match(filter.s3_filter.tag_filter, tags)) {
762 return false;
763 }
764 }
765 }
766
767 return true;
768 }
769
770 int publish_reserve(const DoutPrefixProvider* dpp,
771 EventType event_type,
772 reservation_t& res,
773 const RGWObjTags* req_tags)
774 {
775 RGWPubSub ps(res.store, res.user_tenant);
776 RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key());
777 rgw_pubsub_bucket_topics bucket_topics;
778 auto rc = ps_bucket.get_topics(&bucket_topics);
779 if (rc < 0) {
780 // failed to fetch bucket topics
781 return rc;
782 }
783 for (const auto& bucket_topic : bucket_topics.topics) {
784 const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
785 const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
786 if (!notification_match(res, topic_filter, event_type, req_tags)) {
787 // notification does not apply to req_state
788 continue;
789 }
790 ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
791 "' on topic: '" << topic_cfg.dest.arn_topic <<
792 "' and bucket: '" << res.bucket->get_name() <<
793 "' (unique topic: '" << topic_cfg.name <<
794 "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
795
796 cls_2pc_reservation::id_t res_id;
797 if (topic_cfg.dest.persistent) {
798 // TODO: take default reservation size from conf
799 constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K
800 res.size = DEFAULT_RESERVATION;
801 librados::ObjectWriteOperation op;
802 bufferlist obl;
803 int rval;
804 const auto& queue_name = topic_cfg.dest.arn_topic;
805 cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
806 auto ret = rgw_rados_operate(
807 res.dpp, res.store->getRados()->get_notif_pool_ctx(),
808 queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
809 if (ret < 0) {
810 ldpp_dout(res.dpp, 1) <<
811 "ERROR: failed to reserve notification on queue: "
812 << queue_name << ". error: " << ret << dendl;
813 // if no space is left in queue we ask client to slow down
814 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
815 }
816 ret = cls_2pc_queue_reserve_result(obl, res_id);
817 if (ret < 0) {
818 ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
819 return ret;
820 }
821 }
822 res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
823 }
824 return 0;
825 }
826
827 int publish_commit(rgw::sal::Object* obj,
828 uint64_t size,
829 const ceph::real_time& mtime,
830 const std::string& etag,
831 const std::string& version,
832 EventType event_type,
833 reservation_t& res,
834 const DoutPrefixProvider* dpp)
835 {
836 for (auto& topic : res.topics) {
837 if (topic.cfg.dest.persistent &&
838 topic.res_id == cls_2pc_reservation::NO_ID) {
839 // nothing to commit or already committed/aborted
840 continue;
841 }
842 event_entry_t event_entry;
843 populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
844 event_entry.event.configurationId = topic.configurationId;
845 event_entry.event.opaque_data = topic.cfg.opaque_data;
846 if (topic.cfg.dest.persistent) {
847 event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
848 event_entry.push_endpoint_args =
849 std::move(topic.cfg.dest.push_endpoint_args);
850 event_entry.arn_topic = topic.cfg.dest.arn_topic;
851 bufferlist bl;
852 encode(event_entry, bl);
853 const auto& queue_name = topic.cfg.dest.arn_topic;
854 if (bl.length() > res.size) {
855 // try to make a larger reservation, fail only if this is not possible
856 ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
857 << " exceeded reserved size: " << res.size
858 <<
859 " . trying to make a larger reservation on queue:" << queue_name
860 << dendl;
861 // first cancel the existing reservation
862 librados::ObjectWriteOperation op;
863 cls_2pc_queue_abort(op, topic.res_id);
864 auto ret = rgw_rados_operate(
865 dpp, res.store->getRados()->get_notif_pool_ctx(),
866 topic.cfg.dest.arn_topic, &op,
867 res.yield);
868 if (ret < 0) {
869 ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
870 << topic.res_id <<
871 " when trying to make a larger reservation on queue: " << queue_name
872 << ". error: " << ret << dendl;
873 return ret;
874 }
875 // now try to make a bigger one
876 buffer::list obl;
877 int rval;
878 cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
879 ret = rgw_rados_operate(
880 dpp, res.store->getRados()->get_notif_pool_ctx(),
881 queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
882 if (ret < 0) {
883 ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
884 << queue_name
885 << ". error: " << ret << dendl;
886 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
887 }
888 ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
889 if (ret < 0) {
890 ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
891 "extra space. error: " << ret << dendl;
892 return ret;
893 }
894 }
895 std::vector<buffer::list> bl_data_vec{std::move(bl)};
896 librados::ObjectWriteOperation op;
897 cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
898 const auto ret = rgw_rados_operate(
899 dpp, res.store->getRados()->get_notif_pool_ctx(),
900 queue_name, &op, res.yield);
901 topic.res_id = cls_2pc_reservation::NO_ID;
902 if (ret < 0) {
903 ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
904 << queue_name << ". error: " << ret
905 << dendl;
906 return ret;
907 }
908 } else {
909 try {
910 // TODO add endpoint LRU cache
911 const auto push_endpoint = RGWPubSubEndpoint::create(
912 topic.cfg.dest.push_endpoint,
913 topic.cfg.dest.arn_topic,
914 RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
915 dpp->get_cct());
916 ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
917 << topic.cfg.dest.push_endpoint << dendl;
918 const auto ret = push_endpoint->send_to_completion_async(
919 dpp->get_cct(), event_entry.event, res.yield);
920 if (ret < 0) {
921 ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
922 << topic.cfg.dest.push_endpoint
923 << " failed. error: " << ret << dendl;
924 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
925 return ret;
926 }
927 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
928 } catch (const RGWPubSubEndpoint::configuration_error& e) {
929 ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
930 << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
931 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
932 return -EINVAL;
933 }
934 }
935 }
936 return 0;
937 }
938
939 extern int publish_abort(reservation_t& res) {
940 for (auto& topic : res.topics) {
941 if (!topic.cfg.dest.persistent ||
942 topic.res_id == cls_2pc_reservation::NO_ID) {
943 // nothing to abort or already committed/aborted
944 continue;
945 }
946 const auto& queue_name = topic.cfg.dest.arn_topic;
947 librados::ObjectWriteOperation op;
948 cls_2pc_queue_abort(op, topic.res_id);
949 const auto ret = rgw_rados_operate(
950 res.dpp, res.store->getRados()->get_notif_pool_ctx(),
951 queue_name, &op, res.yield);
952 if (ret < 0) {
953 ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
954 << topic.res_id <<
955 " from queue: " << queue_name << ". error: " << ret << dendl;
956 return ret;
957 }
958 topic.res_id = cls_2pc_reservation::NO_ID;
959 }
960 return 0;
961 }
962
963 reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
964 rgw::sal::RadosStore* _store,
965 req_state* _s,
966 rgw::sal::Object* _object,
967 rgw::sal::Object* _src_object,
968 const std::string* _object_name) :
969 dpp(_s), store(_store), s(_s), size(0) /* XXX */, obj_ctx(_s->obj_ctx),
970 object(_object), src_object(_src_object), bucket(_s->bucket.get()),
971 object_name(_object_name),
972 tagset(_s->tagset),
973 x_meta_map(_s->info.x_meta_map),
974 user_id(_s->user->get_id().id),
975 user_tenant(_s->user->get_id().tenant),
976 req_id(_s->req_id),
977 yield(_s->yield)
978 {}
979
980 reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
981 rgw::sal::RadosStore* _store,
982 RGWObjectCtx* _obj_ctx,
983 rgw::sal::Object* _object,
984 rgw::sal::Object* _src_object,
985 rgw::sal::Bucket* _bucket,
986 std::string& _user_id,
987 std::string& _user_tenant,
988 std::string& _req_id,
989 optional_yield y) :
990 dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
991 obj_ctx(_obj_ctx),
992 object(_object), src_object(_src_object), bucket(_bucket),
993 object_name(nullptr),
994 user_id(_user_id),
995 user_tenant(_user_tenant),
996 req_id(_req_id),
997 yield(y)
998 {}
999
1000 reservation_t::~reservation_t() {
1001 publish_abort(*this);
1002 }
1003
1004 } // namespace rgw::notify