]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_notify.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_notify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_notify.h"
5 #include "cls/2pc_queue/cls_2pc_queue_client.h"
6 #include "cls/lock/cls_lock_client.h"
7 #include <memory>
8 #include <boost/algorithm/hex.hpp>
9 #include <boost/context/protected_fixedsize_stack.hpp>
10 #include <spawn/spawn.hpp>
11 #include "rgw_pubsub.h"
12 #include "rgw_pubsub_push.h"
13 #include "rgw_perf_counters.h"
14 #include "rgw_sal_rados.h"
15 #include "common/dout.h"
16 #include <chrono>
17
18 #define dout_subsys ceph_subsys_rgw
19
20 namespace rgw::notify {
21
22 struct event_entry_t {
23 rgw_pubsub_s3_event event;
24 std::string push_endpoint;
25 std::string push_endpoint_args;
26 std::string arn_topic;
27
28 void encode(bufferlist& bl) const {
29 ENCODE_START(1, 1, bl);
30 encode(event, bl);
31 encode(push_endpoint, bl);
32 encode(push_endpoint_args, bl);
33 encode(arn_topic, bl);
34 ENCODE_FINISH(bl);
35 }
36
37 void decode(bufferlist::const_iterator& bl) {
38 DECODE_START(1, bl);
39 decode(event, bl);
40 decode(push_endpoint, bl);
41 decode(push_endpoint_args, bl);
42 decode(arn_topic, bl);
43 DECODE_FINISH(bl);
44 }
45 };
46 WRITE_CLASS_ENCODER(event_entry_t)
47
48 using queues_t = std::set<std::string>;
49
50 // use mmap/mprotect to allocate 128k coroutine stacks
51 auto make_stack_allocator() {
52 return boost::context::protected_fixedsize_stack{128*1024};
53 }
54
55 class Manager : public DoutPrefixProvider {
56 const size_t max_queue_size;
57 const uint32_t queues_update_period_ms;
58 const uint32_t queues_update_retry_ms;
59 const uint32_t queue_idle_sleep_us;
60 const utime_t failover_time;
61 CephContext* const cct;
62 librados::IoCtx& rados_ioctx;
63 static constexpr auto COOKIE_LEN = 16;
64 const std::string lock_cookie;
65 boost::asio::io_context io_context;
66 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
67 const uint32_t worker_count;
68 std::vector<std::thread> workers;
69 const uint32_t stale_reservations_period_s;
70 const uint32_t reservations_cleanup_period_s;
71
72 const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
73
74 CephContext *get_cct() const override { return cct; }
75 unsigned get_subsys() const override { return dout_subsys; }
76 std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }
77
78 // read the list of queues from the queue list object
79 int read_queue_list(queues_t& queues, optional_yield y) {
80 constexpr auto max_chunk = 1024U;
81 std::string start_after;
82 bool more = true;
83 int rval;
84 while (more) {
85 librados::ObjectReadOperation op;
86 queues_t queues_chunk;
87 op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
88 const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
89 if (ret == -ENOENT) {
90 // queue list object was not created - nothing to do
91 return 0;
92 }
93 if (ret < 0) {
94 // TODO: do we need to check on rval as well as ret?
95 ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
96 return ret;
97 }
98 queues.merge(queues_chunk);
99 }
100 return 0;
101 }
102
103 // set m1 to be the minimum between m1 and m2
104 static int set_min_marker(std::string& m1, const std::string m2) {
105 cls_queue_marker mr1;
106 cls_queue_marker mr2;
107 if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) {
108 return -EINVAL;
109 }
110 if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) {
111 m1 = m2;
112 }
113 return 0;
114 }
115
116 using Clock = ceph::coarse_mono_clock;
117 using Executor = boost::asio::io_context::executor_type;
118 using Timer = boost::asio::basic_waitable_timer<Clock,
119 boost::asio::wait_traits<Clock>, Executor>;
120
121 class tokens_waiter {
122 const std::chrono::hours infinite_duration;
123 size_t pending_tokens;
124 Timer timer;
125
126 struct token {
127 tokens_waiter& waiter;
128 token(tokens_waiter& _waiter) : waiter(_waiter) {
129 ++waiter.pending_tokens;
130 }
131
132 ~token() {
133 --waiter.pending_tokens;
134 if (waiter.pending_tokens == 0) {
135 waiter.timer.cancel();
136 }
137 }
138 };
139
140 public:
141
142 tokens_waiter(boost::asio::io_context& io_context) :
143 infinite_duration(1000),
144 pending_tokens(0),
145 timer(io_context) {}
146
147 void async_wait(spawn::yield_context yield) {
148 timer.expires_from_now(infinite_duration);
149 boost::system::error_code ec;
150 timer.async_wait(yield[ec]);
151 ceph_assert(ec == boost::system::errc::operation_canceled);
152 }
153
154 token make_token() {
155 return token(*this);
156 }
157 };
158
159 // processing of a specific entry
160 // return whether processing was successfull (true) or not (false)
161 bool process_entry(const cls_queue_entry& entry, spawn::yield_context yield) {
162 event_entry_t event_entry;
163 auto iter = entry.data.cbegin();
164 try {
165 decode(event_entry, iter);
166 } catch (buffer::error& err) {
167 ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
168 return false;
169 }
170 try {
171 // TODO move endpoint creation to queue level
172 const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
173 RGWHTTPArgs(event_entry.push_endpoint_args, this),
174 cct);
175 ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
176 " for entry: " << entry.marker << dendl;
177 const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
178 if (ret < 0) {
179 ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
180 << " failed. error: " << ret << " (will retry)" << dendl;
181 return false;
182 } else {
183 ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
184 << " ok" << dendl;
185 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
186 return true;
187 }
188 } catch (const RGWPubSubEndpoint::configuration_error& e) {
189 ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
190 << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
191 return false;
192 }
193 }
194
195 // clean stale reservation from queue
196 void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
197 while (true) {
198 ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
199 const auto now = ceph::coarse_real_time::clock::now();
200 const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
201 librados::ObjectWriteOperation op;
202 op.assert_exists();
203 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
204 ClsLockType::EXCLUSIVE,
205 lock_cookie,
206 "" /*no tag*/);
207 cls_2pc_queue_expire_reservations(op, stale_time);
208 // check ownership and do reservation cleanup in one batch
209 auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
210 if (ret == -ENOENT) {
211 // queue was deleted
212 ldpp_dout(this, 5) << "INFO: queue: "
213 << queue_name << ". was removed. cleanup will stop" << dendl;
214 return;
215 }
216 if (ret == -EBUSY) {
217 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
218 return;
219 }
220 if (ret < 0) {
221 ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
222 << ". error: " << ret << dendl;
223 }
224 Timer timer(io_context);
225 timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s));
226 boost::system::error_code ec;
227 timer.async_wait(yield[ec]);
228 }
229 }
230
231 // processing of a specific queue
232 void process_queue(const std::string& queue_name, spawn::yield_context yield) {
233 constexpr auto max_elements = 1024;
234 auto is_idle = false;
235 const std::string start_marker;
236
237 // start a the cleanup coroutine for the queue
238 spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) {
239 cleanup_queue(queue_name, yield);
240 }, make_stack_allocator());
241
242 while (true) {
243 // if queue was empty the last time, sleep for idle timeout
244 if (is_idle) {
245 Timer timer(io_context);
246 timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us));
247 boost::system::error_code ec;
248 timer.async_wait(yield[ec]);
249 }
250
251 // get list of entries in the queue
252 is_idle = true;
253 bool truncated = false;
254 std::string end_marker;
255 std::vector<cls_queue_entry> entries;
256 auto total_entries = 0U;
257 {
258 librados::ObjectReadOperation op;
259 op.assert_exists();
260 bufferlist obl;
261 int rval;
262 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
263 ClsLockType::EXCLUSIVE,
264 lock_cookie,
265 "" /*no tag*/);
266 cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
267 // check ownership and list entries in one batch
268 auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
269 if (ret == -ENOENT) {
270 // queue was deleted
271 ldpp_dout(this, 5) << "INFO: queue: "
272 << queue_name << ". was removed. processing will stop" << dendl;
273 return;
274 }
275 if (ret == -EBUSY) {
276 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
277 return;
278 }
279 if (ret < 0) {
280 ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
281 << queue_name << ". error: " << ret << " (will retry)" << dendl;
282 continue;
283 }
284 ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
285 if (ret < 0) {
286 ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
287 << queue_name << ". error: " << ret << " (will retry)" << dendl;
288 continue;
289 }
290 }
291 total_entries = entries.size();
292 if (total_entries == 0) {
293 // nothing in the queue
294 continue;
295 }
296 // log when queue is not idle
297 ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
298 ". end marker is: " << end_marker << dendl;
299
300 is_idle = false;
301 auto has_error = false;
302 auto remove_entries = false;
303 auto entry_idx = 1U;
304 tokens_waiter waiter(io_context);
305 for (auto& entry : entries) {
306 if (has_error) {
307 // bail out on first error
308 break;
309 }
310 // TODO pass entry pointer instead of by-value
311 spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](spawn::yield_context yield) {
312 const auto token = waiter.make_token();
313 if (process_entry(entry, yield)) {
314 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
315 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
316 remove_entries = true;
317 } else {
318 if (set_min_marker(end_marker, entry.marker) < 0) {
319 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
320 } else {
321 ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
322 }
323 has_error = true;
324 ldpp_dout(this, 20) << "INFO: processing of entry: " <<
325 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
326 }
327 }, make_stack_allocator());
328 ++entry_idx;
329 }
330
331 // wait for all pending work to finish
332 waiter.async_wait(yield);
333
334 // delete all published entries from queue
335 if (remove_entries) {
336 librados::ObjectWriteOperation op;
337 op.assert_exists();
338 rados::cls::lock::assert_locked(&op, queue_name+"_lock",
339 ClsLockType::EXCLUSIVE,
340 lock_cookie,
341 "" /*no tag*/);
342 cls_2pc_queue_remove_entries(op, end_marker);
343 // check ownership and deleted entries in one batch
344 const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
345 if (ret == -ENOENT) {
346 // queue was deleted
347 ldpp_dout(this, 5) << "INFO: queue: "
348 << queue_name << ". was removed. processing will stop" << dendl;
349 return;
350 }
351 if (ret == -EBUSY) {
352 ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
353 return;
354 }
355 if (ret < 0) {
356 ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: "
357 << queue_name << ". error: " << ret << dendl;
358 } else {
359 ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: "
360 << queue_name << dendl;
361 }
362 }
363
364 }
365 }
366
367 // lits of owned queues
368 using owned_queues_t = std::unordered_set<std::string>;
369
370 // process all queues
371 // find which of the queues is owned by this daemon and process it
372 void process_queues(spawn::yield_context yield) {
373 auto has_error = false;
374 owned_queues_t owned_queues;
375
376 // add randomness to the duration between queue checking
377 // to make sure that different daemons are not synced
378 std::random_device seed;
379 std::mt19937 rnd_gen(seed());
380 const auto min_jitter = 100; // ms
381 const auto max_jitter = 500; // ms
382 std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
383
384 while (true) {
385 Timer timer(io_context);
386 const auto duration = (has_error ?
387 std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
388 std::chrono::milliseconds(duration_jitter(rnd_gen));
389 timer.expires_from_now(duration);
390 const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
391 ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl;
392 boost::system::error_code ec;
393 timer.async_wait(yield[ec]);
394
395 queues_t queues;
396 auto ret = read_queue_list(queues, optional_yield(io_context, yield));
397 if (ret < 0) {
398 has_error = true;
399 continue;
400 }
401
402 std::vector<std::string> queue_gc;
403 std::mutex queue_gc_lock;
404 for (const auto& queue_name : queues) {
405 // try to lock the queue to check if it is owned by this rgw
406 // or if ownershif needs to be taken
407 librados::ObjectWriteOperation op;
408 op.assert_exists();
409 rados::cls::lock::lock(&op, queue_name+"_lock",
410 ClsLockType::EXCLUSIVE,
411 lock_cookie,
412 "" /*no tag*/,
413 "" /*no description*/,
414 failover_time,
415 LOCK_FLAG_MAY_RENEW);
416
417 ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
418 if (ret == -EBUSY) {
419 // lock is already taken by another RGW
420 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
421 // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
422 continue;
423 }
424 if (ret == -ENOENT) {
425 // queue is deleted - processing will stop the next time we try to read from the queue
426 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
427 continue;
428 }
429 if (ret < 0) {
430 // failed to lock for another reason, continue to process other queues
431 ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
432 has_error = true;
433 continue;
434 }
435 // add queue to list of owned queues
436 if (owned_queues.insert(queue_name).second) {
437 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
438 // start processing this queue
439 spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
440 process_queue(queue_name, yield);
441 // if queue processing ended, it measn that the queue was removed or not owned anymore
442 // mark it for deletion
443 std::lock_guard lock_guard(queue_gc_lock);
444 queue_gc.push_back(queue_name);
445 ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
446 }, make_stack_allocator());
447 } else {
448 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
449 }
450 }
451 // erase all queue that were deleted
452 {
453 std::lock_guard lock_guard(queue_gc_lock);
454 std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
455 owned_queues.erase(queue_name);
456 ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
457 });
458 queue_gc.clear();
459 }
460 }
461 }
462
463 public:
464
465 ~Manager() {
466 work_guard.reset();
467 io_context.stop();
468 std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
469 }
470
471 // ctor: start all threads
472 Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
473 uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
474 uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
475 uint32_t _worker_count, rgw::sal::RGWRadosStore* store) :
476 max_queue_size(_max_queue_size),
477 queues_update_period_ms(_queues_update_period_ms),
478 queues_update_retry_ms(_queues_update_retry_ms),
479 queue_idle_sleep_us(_queue_idle_sleep_us),
480 failover_time(std::chrono::milliseconds(failover_time_ms)),
481 cct(_cct),
482 rados_ioctx(store->getRados()->get_notif_pool_ctx()),
483 lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
484 work_guard(boost::asio::make_work_guard(io_context)),
485 worker_count(_worker_count),
486 stale_reservations_period_s(_stale_reservations_period_s),
487 reservations_cleanup_period_s(_reservations_cleanup_period_s)
488 {
489 spawn::spawn(io_context, [this](spawn::yield_context yield) {
490 process_queues(yield);
491 }, make_stack_allocator());
492
493 // start the worker threads to do the actual queue processing
494 const std::string WORKER_THREAD_NAME = "notif-worker";
495 for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
496 workers.emplace_back([this]() { io_context.run(); });
497 const auto rc = ceph_pthread_setname(workers.back().native_handle(),
498 (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
499 ceph_assert(rc == 0);
500 }
501 ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
502 }
503
504 int add_persistent_topic(const std::string& topic_name, optional_yield y) {
505 if (topic_name == Q_LIST_OBJECT_NAME) {
506 ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
507 return -EINVAL;
508 }
509 librados::ObjectWriteOperation op;
510 op.create(true);
511 cls_2pc_queue_init(op, topic_name, max_queue_size);
512 auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
513 if (ret == -EEXIST) {
514 // queue already exists - nothing to do
515 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
516 return 0;
517 }
518 if (ret < 0) {
519 // failed to create queue
520 ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
521 return ret;
522 }
523
524 bufferlist empty_bl;
525 std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
526 op.omap_set(new_topic);
527 ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
528 if (ret < 0) {
529 ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
530 return ret;
531 }
532 ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
533 return 0;
534 }
535
536 int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
537 librados::ObjectWriteOperation op;
538 op.remove();
539 auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
540 if (ret == -ENOENT) {
541 // queue already removed - nothing to do
542 ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
543 return 0;
544 }
545 if (ret < 0) {
546 // failed to remove queue
547 ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
548 return ret;
549 }
550
551 std::set<std::string> topic_to_remove{{topic_name}};
552 op.omap_rm_keys(topic_to_remove);
553 ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
554 if (ret < 0) {
555 ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
556 return ret;
557 }
558 ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
559 return 0;
560 }
561 };
562
563 // singleton manager
564 // note that the manager itself is not a singleton, and multiple instances may co-exist
565 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
566 static Manager* s_manager = nullptr;
567
568 constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
569 constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
570 constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed
571 constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms
572 constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
573 constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
574 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
575 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
576
577 bool init(CephContext* cct, rgw::sal::RGWRadosStore* store, const DoutPrefixProvider *dpp) {
578 if (s_manager) {
579 return false;
580 }
581 // TODO: take conf from CephContext
582 s_manager = new Manager(cct, MAX_QUEUE_SIZE,
583 Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
584 IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
585 STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
586 WORKER_COUNT,
587 store);
588 return true;
589 }
590
591 void shutdown() {
592 delete s_manager;
593 s_manager = nullptr;
594 }
595
596 int add_persistent_topic(const std::string& topic_name, optional_yield y) {
597 if (!s_manager) {
598 return -EAGAIN;
599 }
600 return s_manager->add_persistent_topic(topic_name, y);
601 }
602
603 int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
604 if (!s_manager) {
605 return -EAGAIN;
606 }
607 return s_manager->remove_persistent_topic(topic_name, y);
608 }
609
610 rgw::sal::RGWObject* get_object_with_atttributes(const req_state* s, rgw::sal::RGWObject* obj) {
611 // in case of copy obj, the tags and metadata are taken from source
612 const auto src_obj = s->src_object ? s->src_object.get() : obj;
613 if (src_obj->get_attrs().empty()) {
614 if (!src_obj->get_bucket()) {
615 src_obj->set_bucket(s->bucket.get());
616 }
617 if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) {
618 return nullptr;
619 }
620 }
621 return src_obj;
622 }
623
624 void metadata_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValueMap& metadata) {
625 const auto src_obj = get_object_with_atttributes(s, obj);
626 if (!src_obj) {
627 return;
628 }
629 for (auto& attr : src_obj->get_attrs()) {
630 if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
631 std::string_view key(attr.first);
632 key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
633 // we want to pass a null terminated version
634 // of the bufferlist, hence "to_str().c_str()"
635 metadata.emplace(key, attr.second.to_str().c_str());
636 }
637 }
638 }
639
640 void tags_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValueMap& tags) {
641 const auto src_obj = get_object_with_atttributes(s, obj);
642 if (!src_obj) {
643 return;
644 }
645 const auto& attrs = src_obj->get_attrs();
646 const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
647 if (attr_iter != attrs.end()) {
648 auto bliter = attr_iter->second.cbegin();
649 RGWObjTags obj_tags;
650 try {
651 ::decode(obj_tags, bliter);
652 } catch(buffer::error&) {
653 // not able to decode tags
654 return;
655 }
656 tags = std::move(obj_tags.get_tags());
657 }
658 }
659
660 // populate event from request
661 void populate_event_from_request(const req_state *s,
662 rgw::sal::RGWObject* obj,
663 uint64_t size,
664 const ceph::real_time& mtime,
665 const std::string& etag,
666 EventType event_type,
667 rgw_pubsub_s3_event& event) {
668 event.eventTime = mtime;
669 event.eventName = to_string(event_type);
670 event.userIdentity = s->user->get_id().id; // user that triggered the change
671 event.x_amz_request_id = s->req_id; // request ID of the original change
672 event.x_amz_id_2 = s->host_id; // RGW on which the change was made
673 // configurationId is filled from notification configuration
674 event.bucket_name = s->bucket_name;
675 event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
676 event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
677 event.object_key = obj->get_name();
678 event.object_size = size;
679 event.object_etag = etag;
680 event.object_versionId = obj->get_instance();
681 // use timestamp as per key sequence id (hex encoded)
682 const utime_t ts(real_clock::now());
683 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
684 std::back_inserter(event.object_sequencer));
685 set_event_id(event.id, etag, ts);
686 event.bucket_id = s->bucket->get_bucket_id();
687 // pass meta data
688 if (s->info.x_meta_map.empty()) {
689 // try to fetch the metadata from the attributes
690 metadata_from_attributes(s, obj, event.x_meta_map);
691 } else {
692 event.x_meta_map = s->info.x_meta_map;
693 }
694 // pass tags
695 if (s->tagset.get_tags().empty()) {
696 // try to fetch the tags from the attributes
697 tags_from_attributes(s, obj, event.tags);
698 } else {
699 event.tags = s->tagset.get_tags();
700 }
701 // opaque data will be filled from topic configuration
702 }
703
704 bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj,
705 EventType event, const RGWObjTags* req_tags) {
706 if (!match(filter.events, event)) {
707 return false;
708 }
709 if (!match(filter.s3_filter.key_filter, obj->get_name())) {
710 return false;
711 }
712
713 if (!filter.s3_filter.metadata_filter.kv.empty()) {
714 // metadata filter exists
715 if (!s->info.x_meta_map.empty()) {
716 // metadata was cached in req_state
717 if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
718 return false;
719 }
720 } else {
721 // try to fetch the metadata from the attributes
722 KeyValueMap metadata;
723 metadata_from_attributes(s, obj, metadata);
724 if (!match(filter.s3_filter.metadata_filter, metadata)) {
725 return false;
726 }
727 }
728 }
729
730 if (!filter.s3_filter.tag_filter.kv.empty()) {
731 // tag filter exists
732 if (req_tags) {
733 // tags in the request
734 if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
735 return false;
736 }
737 } else if (!s->tagset.get_tags().empty()) {
738 // tags were cached in req_state
739 if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
740 return false;
741 }
742 } else {
743 // try to fetch tags from the attributes
744 KeyValueMap tags;
745 tags_from_attributes(s, obj, tags);
746 if (!match(filter.s3_filter.tag_filter, tags)) {
747 return false;
748 }
749 }
750 }
751
752 return true;
753 }
754
755 int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
756 reservation_t& res,
757 const RGWObjTags* req_tags)
758 {
759 RGWPubSub ps(res.store, res.s->user->get_id().tenant);
760 RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key());
761 rgw_pubsub_bucket_topics bucket_topics;
762 auto rc = ps_bucket.get_topics(&bucket_topics);
763 if (rc < 0) {
764 // failed to fetch bucket topics
765 return rc;
766 }
767 for (const auto& bucket_topic : bucket_topics.topics) {
768 const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
769 const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
770 if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
771 // notification does not apply to req_state
772 continue;
773 }
774 ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
775 "' on topic: '" << topic_cfg.dest.arn_topic <<
776 "' and bucket: '" << res.s->bucket->get_name() <<
777 "' (unique topic: '" << topic_cfg.name <<
778 "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
779
780 cls_2pc_reservation::id_t res_id;
781 if (topic_cfg.dest.persistent) {
782 // TODO: take default reservation size from conf
783 constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K
784 res.size = DEFAULT_RESERVATION;
785 librados::ObjectWriteOperation op;
786 bufferlist obl;
787 int rval;
788 const auto& queue_name = topic_cfg.dest.arn_topic;
789 cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
790 auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
791 queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
792 if (ret < 0) {
793 ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name
794 << ". error: " << ret << dendl;
795 // if no space is left in queue we ask client to slow down
796 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
797 }
798 ret = cls_2pc_queue_reserve_result(obl, res_id);
799 if (ret < 0) {
800 ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
801 return ret;
802 }
803 }
804 res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
805 }
806 return 0;
807 }
808
809 int publish_commit(rgw::sal::RGWObject* obj,
810 uint64_t size,
811 const ceph::real_time& mtime,
812 const std::string& etag,
813 EventType event_type,
814 reservation_t& res,
815 const DoutPrefixProvider *dpp)
816 {
817 for (auto& topic : res.topics) {
818 if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) {
819 // nothing to commit or already committed/aborted
820 continue;
821 }
822 event_entry_t event_entry;
823 populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
824 event_entry.event.configurationId = topic.configurationId;
825 event_entry.event.opaque_data = topic.cfg.opaque_data;
826 if (topic.cfg.dest.persistent) {
827 event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
828 event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
829 event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic);
830 bufferlist bl;
831 encode(event_entry, bl);
832 const auto& queue_name = topic.cfg.dest.arn_topic;
833 if (bl.length() > res.size) {
834 // try to make a larger reservation, fail only if this is not possible
835 ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
836 " . trying to make a larger reservation on queue:" << queue_name << dendl;
837 // first cancel the existing reservation
838 librados::ObjectWriteOperation op;
839 cls_2pc_queue_abort(op, topic.res_id);
840 auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
841 topic.cfg.dest.arn_topic, &op,
842 res.s->yield);
843 if (ret < 0) {
844 ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
845 " when trying to make a larger reservation on queue: " << queue_name
846 << ". error: " << ret << dendl;
847 return ret;
848 }
849 // now try to make a bigger one
850 bufferlist obl;
851 int rval;
852 cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
853 ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
854 queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
855 if (ret < 0) {
856 ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
857 << ". error: " << ret << dendl;
858 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
859 }
860 ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
861 if (ret < 0) {
862 ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
863 return ret;
864 }
865 }
866 std::vector<bufferlist> bl_data_vec{std::move(bl)};
867 librados::ObjectWriteOperation op;
868 cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
869 const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
870 queue_name, &op,
871 res.s->yield);
872 topic.res_id = cls_2pc_reservation::NO_ID;
873 if (ret < 0) {
874 ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
875 << ". error: " << ret << dendl;
876 return ret;
877 }
878 } else {
879 try {
880 // TODO add endpoint LRU cache
881 const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint,
882 topic.cfg.dest.arn_topic,
883 RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
884 res.s->cct);
885 ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
886 const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
887 if (ret < 0) {
888 ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
889 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
890 return ret;
891 }
892 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
893 } catch (const RGWPubSubEndpoint::configuration_error& e) {
894 ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
895 << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
896 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
897 return -EINVAL;
898 }
899 }
900 }
901 return 0;
902 }
903
904 int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
905 for (auto& topic : res.topics) {
906 if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) {
907 // nothing to abort or already committed/aborted
908 continue;
909 }
910 const auto& queue_name = topic.cfg.dest.arn_topic;
911 librados::ObjectWriteOperation op;
912 cls_2pc_queue_abort(op, topic.res_id);
913 const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
914 queue_name, &op,
915 res.s->yield);
916 if (ret < 0) {
917 ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
918 " from queue: " << queue_name << ". error: " << ret << dendl;
919 return ret;
920 }
921 topic.res_id = cls_2pc_reservation::NO_ID;
922 }
923 return 0;
924 }
925
926 reservation_t::~reservation_t() {
927 publish_abort(dpp, *this);
928 }
929
930 }
931