]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_notify.h" | |
f67539c2 TL |
5 | #include "cls/2pc_queue/cls_2pc_queue_client.h" |
6 | #include "cls/lock/cls_lock_client.h" | |
eafe8130 TL |
7 | #include <memory> |
8 | #include <boost/algorithm/hex.hpp> | |
f67539c2 TL |
9 | #include <boost/context/protected_fixedsize_stack.hpp> |
10 | #include <spawn/spawn.hpp> | |
20effc67 | 11 | #include "rgw_sal_rados.h" |
eafe8130 TL |
12 | #include "rgw_pubsub.h" |
13 | #include "rgw_pubsub_push.h" | |
14 | #include "rgw_perf_counters.h" | |
15 | #include "common/dout.h" | |
f67539c2 | 16 | #include <chrono> |
eafe8130 TL |
17 | |
18 | #define dout_subsys ceph_subsys_rgw | |
19 | ||
20 | namespace rgw::notify { | |
21 | ||
f67539c2 TL |
22 | struct event_entry_t { |
23 | rgw_pubsub_s3_event event; | |
24 | std::string push_endpoint; | |
25 | std::string push_endpoint_args; | |
26 | std::string arn_topic; | |
27 | ||
28 | void encode(bufferlist& bl) const { | |
29 | ENCODE_START(1, 1, bl); | |
30 | encode(event, bl); | |
31 | encode(push_endpoint, bl); | |
32 | encode(push_endpoint_args, bl); | |
33 | encode(arn_topic, bl); | |
34 | ENCODE_FINISH(bl); | |
35 | } | |
36 | ||
37 | void decode(bufferlist::const_iterator& bl) { | |
38 | DECODE_START(1, bl); | |
39 | decode(event, bl); | |
40 | decode(push_endpoint, bl); | |
41 | decode(push_endpoint_args, bl); | |
42 | decode(arn_topic, bl); | |
43 | DECODE_FINISH(bl); | |
44 | } | |
45 | }; | |
46 | WRITE_CLASS_ENCODER(event_entry_t) | |
47 | ||
48 | using queues_t = std::set<std::string>; | |
49 | ||
50 | // use mmap/mprotect to allocate 128k coroutine stacks | |
51 | auto make_stack_allocator() { | |
52 | return boost::context::protected_fixedsize_stack{128*1024}; | |
53 | } | |
54 | ||
aee94f69 TL |
55 | const std::string Q_LIST_OBJECT_NAME = "queues_list_object"; |
56 | ||
b3b6e05e | 57 | class Manager : public DoutPrefixProvider { |
f67539c2 TL |
58 | const size_t max_queue_size; |
59 | const uint32_t queues_update_period_ms; | |
60 | const uint32_t queues_update_retry_ms; | |
61 | const uint32_t queue_idle_sleep_us; | |
62 | const utime_t failover_time; | |
63 | CephContext* const cct; | |
f67539c2 TL |
64 | static constexpr auto COOKIE_LEN = 16; |
65 | const std::string lock_cookie; | |
66 | boost::asio::io_context io_context; | |
67 | boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard; | |
68 | const uint32_t worker_count; | |
69 | std::vector<std::thread> workers; | |
70 | const uint32_t stale_reservations_period_s; | |
71 | const uint32_t reservations_cleanup_period_s; | |
aee94f69 TL |
72 | public: |
73 | librados::IoCtx& rados_ioctx; | |
74 | private: | |
f67539c2 | 75 | |
b3b6e05e TL |
76 | CephContext *get_cct() const override { return cct; } |
77 | unsigned get_subsys() const override { return dout_subsys; } | |
78 | std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; } | |
79 | ||
f67539c2 TL |
80 | // read the list of queues from the queue list object |
81 | int read_queue_list(queues_t& queues, optional_yield y) { | |
82 | constexpr auto max_chunk = 1024U; | |
83 | std::string start_after; | |
84 | bool more = true; | |
85 | int rval; | |
86 | while (more) { | |
87 | librados::ObjectReadOperation op; | |
88 | queues_t queues_chunk; | |
89 | op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval); | |
b3b6e05e | 90 | const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y); |
f67539c2 TL |
91 | if (ret == -ENOENT) { |
92 | // queue list object was not created - nothing to do | |
93 | return 0; | |
94 | } | |
95 | if (ret < 0) { | |
96 | // TODO: do we need to check on rval as well as ret? | |
b3b6e05e | 97 | ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl; |
f67539c2 TL |
98 | return ret; |
99 | } | |
100 | queues.merge(queues_chunk); | |
101 | } | |
102 | return 0; | |
103 | } | |
104 | ||
105 | // set m1 to be the minimum between m1 and m2 | |
106 | static int set_min_marker(std::string& m1, const std::string m2) { | |
107 | cls_queue_marker mr1; | |
108 | cls_queue_marker mr2; | |
109 | if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) { | |
110 | return -EINVAL; | |
111 | } | |
112 | if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) { | |
113 | m1 = m2; | |
114 | } | |
115 | return 0; | |
116 | } | |
117 | ||
118 | using Clock = ceph::coarse_mono_clock; | |
119 | using Executor = boost::asio::io_context::executor_type; | |
120 | using Timer = boost::asio::basic_waitable_timer<Clock, | |
121 | boost::asio::wait_traits<Clock>, Executor>; | |
122 | ||
123 | class tokens_waiter { | |
124 | const std::chrono::hours infinite_duration; | |
125 | size_t pending_tokens; | |
126 | Timer timer; | |
127 | ||
128 | struct token { | |
129 | tokens_waiter& waiter; | |
130 | token(tokens_waiter& _waiter) : waiter(_waiter) { | |
131 | ++waiter.pending_tokens; | |
132 | } | |
133 | ||
134 | ~token() { | |
135 | --waiter.pending_tokens; | |
136 | if (waiter.pending_tokens == 0) { | |
137 | waiter.timer.cancel(); | |
138 | } | |
139 | } | |
140 | }; | |
141 | ||
142 | public: | |
143 | ||
144 | tokens_waiter(boost::asio::io_context& io_context) : | |
145 | infinite_duration(1000), | |
146 | pending_tokens(0), | |
147 | timer(io_context) {} | |
148 | ||
20effc67 | 149 | void async_wait(yield_context yield) { |
522d829b TL |
150 | if (pending_tokens == 0) { |
151 | return; | |
152 | } | |
f67539c2 TL |
153 | timer.expires_from_now(infinite_duration); |
154 | boost::system::error_code ec; | |
155 | timer.async_wait(yield[ec]); | |
156 | ceph_assert(ec == boost::system::errc::operation_canceled); | |
157 | } | |
158 | ||
159 | token make_token() { | |
160 | return token(*this); | |
161 | } | |
162 | }; | |
163 | ||
164 | // processing of a specific entry | |
165 | // return whether processing was successfull (true) or not (false) | |
20effc67 | 166 | bool process_entry(const cls_queue_entry& entry, yield_context yield) { |
f67539c2 TL |
167 | event_entry_t event_entry; |
168 | auto iter = entry.data.cbegin(); | |
169 | try { | |
170 | decode(event_entry, iter); | |
171 | } catch (buffer::error& err) { | |
b3b6e05e | 172 | ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl; |
f67539c2 TL |
173 | return false; |
174 | } | |
175 | try { | |
176 | // TODO move endpoint creation to queue level | |
177 | const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic, | |
b3b6e05e | 178 | RGWHTTPArgs(event_entry.push_endpoint_args, this), |
f67539c2 | 179 | cct); |
b3b6e05e | 180 | ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint << |
f67539c2 TL |
181 | " for entry: " << entry.marker << dendl; |
182 | const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield)); | |
183 | if (ret < 0) { | |
b3b6e05e | 184 | ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint |
f67539c2 TL |
185 | << " failed. error: " << ret << " (will retry)" << dendl; |
186 | return false; | |
187 | } else { | |
b3b6e05e | 188 | ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint |
f67539c2 TL |
189 | << " ok" << dendl; |
190 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
191 | return true; | |
192 | } | |
193 | } catch (const RGWPubSubEndpoint::configuration_error& e) { | |
b3b6e05e | 194 | ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " |
f67539c2 TL |
195 | << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl; |
196 | return false; | |
197 | } | |
198 | } | |
199 | ||
200 | // clean stale reservation from queue | |
20effc67 | 201 | void cleanup_queue(const std::string& queue_name, yield_context yield) { |
f67539c2 | 202 | while (true) { |
b3b6e05e | 203 | ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl; |
f67539c2 TL |
204 | const auto now = ceph::coarse_real_time::clock::now(); |
205 | const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s); | |
206 | librados::ObjectWriteOperation op; | |
207 | op.assert_exists(); | |
208 | rados::cls::lock::assert_locked(&op, queue_name+"_lock", | |
209 | ClsLockType::EXCLUSIVE, | |
210 | lock_cookie, | |
211 | "" /*no tag*/); | |
212 | cls_2pc_queue_expire_reservations(op, stale_time); | |
213 | // check ownership and do reservation cleanup in one batch | |
b3b6e05e | 214 | auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); |
f67539c2 TL |
215 | if (ret == -ENOENT) { |
216 | // queue was deleted | |
b3b6e05e | 217 | ldpp_dout(this, 5) << "INFO: queue: " |
f67539c2 TL |
218 | << queue_name << ". was removed. cleanup will stop" << dendl; |
219 | return; | |
220 | } | |
221 | if (ret == -EBUSY) { | |
b3b6e05e | 222 | ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; |
f67539c2 TL |
223 | return; |
224 | } | |
225 | if (ret < 0) { | |
b3b6e05e | 226 | ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name |
f67539c2 TL |
227 | << ". error: " << ret << dendl; |
228 | } | |
229 | Timer timer(io_context); | |
230 | timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s)); | |
231 | boost::system::error_code ec; | |
232 | timer.async_wait(yield[ec]); | |
233 | } | |
234 | } | |
235 | ||
236 | // processing of a specific queue | |
20effc67 | 237 | void process_queue(const std::string& queue_name, yield_context yield) { |
f67539c2 TL |
238 | constexpr auto max_elements = 1024; |
239 | auto is_idle = false; | |
240 | const std::string start_marker; | |
241 | ||
242 | // start a the cleanup coroutine for the queue | |
20effc67 | 243 | spawn::spawn(io_context, [this, queue_name](yield_context yield) { |
f67539c2 TL |
244 | cleanup_queue(queue_name, yield); |
245 | }, make_stack_allocator()); | |
246 | ||
247 | while (true) { | |
248 | // if queue was empty the last time, sleep for idle timeout | |
249 | if (is_idle) { | |
250 | Timer timer(io_context); | |
251 | timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us)); | |
252 | boost::system::error_code ec; | |
253 | timer.async_wait(yield[ec]); | |
254 | } | |
255 | ||
256 | // get list of entries in the queue | |
257 | is_idle = true; | |
258 | bool truncated = false; | |
259 | std::string end_marker; | |
260 | std::vector<cls_queue_entry> entries; | |
261 | auto total_entries = 0U; | |
262 | { | |
263 | librados::ObjectReadOperation op; | |
264 | op.assert_exists(); | |
265 | bufferlist obl; | |
266 | int rval; | |
267 | rados::cls::lock::assert_locked(&op, queue_name+"_lock", | |
268 | ClsLockType::EXCLUSIVE, | |
269 | lock_cookie, | |
270 | "" /*no tag*/); | |
271 | cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval); | |
272 | // check ownership and list entries in one batch | |
b3b6e05e | 273 | auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield)); |
f67539c2 TL |
274 | if (ret == -ENOENT) { |
275 | // queue was deleted | |
b3b6e05e | 276 | ldpp_dout(this, 5) << "INFO: queue: " |
f67539c2 TL |
277 | << queue_name << ". was removed. processing will stop" << dendl; |
278 | return; | |
279 | } | |
280 | if (ret == -EBUSY) { | |
b3b6e05e | 281 | ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; |
f67539c2 TL |
282 | return; |
283 | } | |
284 | if (ret < 0) { | |
b3b6e05e | 285 | ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: " |
f67539c2 TL |
286 | << queue_name << ". error: " << ret << " (will retry)" << dendl; |
287 | continue; | |
288 | } | |
289 | ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker); | |
290 | if (ret < 0) { | |
b3b6e05e | 291 | ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: " |
f67539c2 TL |
292 | << queue_name << ". error: " << ret << " (will retry)" << dendl; |
293 | continue; | |
294 | } | |
295 | } | |
296 | total_entries = entries.size(); | |
297 | if (total_entries == 0) { | |
298 | // nothing in the queue | |
299 | continue; | |
300 | } | |
301 | // log when queue is not idle | |
b3b6e05e | 302 | ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name << |
f67539c2 TL |
303 | ". end marker is: " << end_marker << dendl; |
304 | ||
305 | is_idle = false; | |
306 | auto has_error = false; | |
307 | auto remove_entries = false; | |
308 | auto entry_idx = 1U; | |
309 | tokens_waiter waiter(io_context); | |
310 | for (auto& entry : entries) { | |
311 | if (has_error) { | |
312 | // bail out on first error | |
313 | break; | |
314 | } | |
315 | // TODO pass entry pointer instead of by-value | |
20effc67 | 316 | spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) { |
f67539c2 TL |
317 | const auto token = waiter.make_token(); |
318 | if (process_entry(entry, yield)) { | |
b3b6e05e | 319 | ldpp_dout(this, 20) << "INFO: processing of entry: " << |
f67539c2 TL |
320 | entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl; |
321 | remove_entries = true; | |
322 | } else { | |
323 | if (set_min_marker(end_marker, entry.marker) < 0) { | |
b3b6e05e | 324 | ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; |
f67539c2 | 325 | } else { |
b3b6e05e | 326 | ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl; |
f67539c2 TL |
327 | } |
328 | has_error = true; | |
b3b6e05e | 329 | ldpp_dout(this, 20) << "INFO: processing of entry: " << |
f67539c2 TL |
330 | entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; |
331 | } | |
332 | }, make_stack_allocator()); | |
333 | ++entry_idx; | |
334 | } | |
335 | ||
336 | // wait for all pending work to finish | |
337 | waiter.async_wait(yield); | |
338 | ||
339 | // delete all published entries from queue | |
340 | if (remove_entries) { | |
341 | librados::ObjectWriteOperation op; | |
342 | op.assert_exists(); | |
343 | rados::cls::lock::assert_locked(&op, queue_name+"_lock", | |
344 | ClsLockType::EXCLUSIVE, | |
345 | lock_cookie, | |
346 | "" /*no tag*/); | |
347 | cls_2pc_queue_remove_entries(op, end_marker); | |
348 | // check ownership and deleted entries in one batch | |
b3b6e05e | 349 | const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); |
f67539c2 TL |
350 | if (ret == -ENOENT) { |
351 | // queue was deleted | |
b3b6e05e | 352 | ldpp_dout(this, 5) << "INFO: queue: " |
f67539c2 TL |
353 | << queue_name << ". was removed. processing will stop" << dendl; |
354 | return; | |
355 | } | |
356 | if (ret == -EBUSY) { | |
b3b6e05e | 357 | ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; |
f67539c2 TL |
358 | return; |
359 | } | |
360 | if (ret < 0) { | |
b3b6e05e | 361 | ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: " |
f67539c2 TL |
362 | << queue_name << ". error: " << ret << dendl; |
363 | } else { | |
b3b6e05e | 364 | ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: " |
f67539c2 TL |
365 | << queue_name << dendl; |
366 | } | |
367 | } | |
f67539c2 TL |
368 | } |
369 | } | |
370 | ||
371 | // lits of owned queues | |
372 | using owned_queues_t = std::unordered_set<std::string>; | |
373 | ||
374 | // process all queues | |
375 | // find which of the queues is owned by this daemon and process it | |
20effc67 | 376 | void process_queues(yield_context yield) { |
f67539c2 TL |
377 | auto has_error = false; |
378 | owned_queues_t owned_queues; | |
379 | ||
380 | // add randomness to the duration between queue checking | |
381 | // to make sure that different daemons are not synced | |
382 | std::random_device seed; | |
383 | std::mt19937 rnd_gen(seed()); | |
384 | const auto min_jitter = 100; // ms | |
385 | const auto max_jitter = 500; // ms | |
386 | std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter); | |
387 | ||
522d829b TL |
388 | std::vector<std::string> queue_gc; |
389 | std::mutex queue_gc_lock; | |
f67539c2 TL |
390 | while (true) { |
391 | Timer timer(io_context); | |
392 | const auto duration = (has_error ? | |
393 | std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + | |
394 | std::chrono::milliseconds(duration_jitter(rnd_gen)); | |
395 | timer.expires_from_now(duration); | |
396 | const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration); | |
b3b6e05e | 397 | ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl; |
f67539c2 TL |
398 | boost::system::error_code ec; |
399 | timer.async_wait(yield[ec]); | |
400 | ||
401 | queues_t queues; | |
402 | auto ret = read_queue_list(queues, optional_yield(io_context, yield)); | |
403 | if (ret < 0) { | |
404 | has_error = true; | |
405 | continue; | |
406 | } | |
407 | ||
f67539c2 TL |
408 | for (const auto& queue_name : queues) { |
409 | // try to lock the queue to check if it is owned by this rgw | |
410 | // or if ownershif needs to be taken | |
411 | librados::ObjectWriteOperation op; | |
412 | op.assert_exists(); | |
413 | rados::cls::lock::lock(&op, queue_name+"_lock", | |
414 | ClsLockType::EXCLUSIVE, | |
415 | lock_cookie, | |
416 | "" /*no tag*/, | |
417 | "" /*no description*/, | |
418 | failover_time, | |
419 | LOCK_FLAG_MAY_RENEW); | |
420 | ||
b3b6e05e | 421 | ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); |
f67539c2 TL |
422 | if (ret == -EBUSY) { |
423 | // lock is already taken by another RGW | |
b3b6e05e | 424 | ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl; |
f67539c2 TL |
425 | // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards |
426 | continue; | |
427 | } | |
428 | if (ret == -ENOENT) { | |
429 | // queue is deleted - processing will stop the next time we try to read from the queue | |
b3b6e05e | 430 | ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl; |
f67539c2 TL |
431 | continue; |
432 | } | |
433 | if (ret < 0) { | |
434 | // failed to lock for another reason, continue to process other queues | |
b3b6e05e | 435 | ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl; |
f67539c2 TL |
436 | has_error = true; |
437 | continue; | |
438 | } | |
439 | // add queue to list of owned queues | |
440 | if (owned_queues.insert(queue_name).second) { | |
b3b6e05e | 441 | ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl; |
f67539c2 | 442 | // start processing this queue |
20effc67 | 443 | spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) { |
f67539c2 TL |
444 | process_queue(queue_name, yield); |
445 | // if queue processing ended, it measn that the queue was removed or not owned anymore | |
446 | // mark it for deletion | |
447 | std::lock_guard lock_guard(queue_gc_lock); | |
448 | queue_gc.push_back(queue_name); | |
b3b6e05e | 449 | ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl; |
f67539c2 TL |
450 | }, make_stack_allocator()); |
451 | } else { | |
b3b6e05e | 452 | ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl; |
f67539c2 TL |
453 | } |
454 | } | |
455 | // erase all queue that were deleted | |
456 | { | |
457 | std::lock_guard lock_guard(queue_gc_lock); | |
458 | std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) { | |
459 | owned_queues.erase(queue_name); | |
b3b6e05e | 460 | ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl; |
f67539c2 TL |
461 | }); |
462 | queue_gc.clear(); | |
463 | } | |
464 | } | |
465 | } | |
466 | ||
467 | public: | |
468 | ||
469 | ~Manager() { | |
470 | work_guard.reset(); | |
471 | io_context.stop(); | |
472 | std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); }); | |
473 | } | |
474 | ||
475 | // ctor: start all threads | |
476 | Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, | |
477 | uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, | |
478 | uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s, | |
20effc67 | 479 | uint32_t _worker_count, rgw::sal::RadosStore* store) : |
f67539c2 TL |
480 | max_queue_size(_max_queue_size), |
481 | queues_update_period_ms(_queues_update_period_ms), | |
482 | queues_update_retry_ms(_queues_update_retry_ms), | |
483 | queue_idle_sleep_us(_queue_idle_sleep_us), | |
484 | failover_time(std::chrono::milliseconds(failover_time_ms)), | |
485 | cct(_cct), | |
f67539c2 TL |
486 | lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)), |
487 | work_guard(boost::asio::make_work_guard(io_context)), | |
488 | worker_count(_worker_count), | |
489 | stale_reservations_period_s(_stale_reservations_period_s), | |
aee94f69 TL |
490 | reservations_cleanup_period_s(_reservations_cleanup_period_s), |
491 | rados_ioctx(store->getRados()->get_notif_pool_ctx()) | |
f67539c2 | 492 | { |
20effc67 | 493 | spawn::spawn(io_context, [this] (yield_context yield) { |
f67539c2 TL |
494 | process_queues(yield); |
495 | }, make_stack_allocator()); | |
496 | ||
497 | // start the worker threads to do the actual queue processing | |
498 | const std::string WORKER_THREAD_NAME = "notif-worker"; | |
499 | for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) { | |
522d829b TL |
500 | workers.emplace_back([this]() { |
501 | try { | |
502 | io_context.run(); | |
503 | } catch (const std::exception& err) { | |
504 | ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl; | |
505 | throw(err); | |
506 | } | |
507 | }); | |
20effc67 | 508 | const auto rc = ceph_pthread_setname(workers.back().native_handle(), |
522d829b | 509 | (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str()); |
f67539c2 TL |
510 | ceph_assert(rc == 0); |
511 | } | |
b3b6e05e | 512 | ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl; |
f67539c2 TL |
513 | } |
514 | ||
515 | int add_persistent_topic(const std::string& topic_name, optional_yield y) { | |
516 | if (topic_name == Q_LIST_OBJECT_NAME) { | |
b3b6e05e | 517 | ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl; |
f67539c2 TL |
518 | return -EINVAL; |
519 | } | |
520 | librados::ObjectWriteOperation op; | |
521 | op.create(true); | |
522 | cls_2pc_queue_init(op, topic_name, max_queue_size); | |
b3b6e05e | 523 | auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y); |
f67539c2 TL |
524 | if (ret == -EEXIST) { |
525 | // queue already exists - nothing to do | |
b3b6e05e | 526 | ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl; |
f67539c2 TL |
527 | return 0; |
528 | } | |
529 | if (ret < 0) { | |
530 | // failed to create queue | |
b3b6e05e | 531 | ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl; |
f67539c2 TL |
532 | return ret; |
533 | } | |
534 | ||
535 | bufferlist empty_bl; | |
536 | std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}}; | |
537 | op.omap_set(new_topic); | |
b3b6e05e | 538 | ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y); |
f67539c2 | 539 | if (ret < 0) { |
b3b6e05e | 540 | ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl; |
f67539c2 TL |
541 | return ret; |
542 | } | |
b3b6e05e | 543 | ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl; |
f67539c2 TL |
544 | return 0; |
545 | } | |
f67539c2 TL |
546 | }; |
547 | ||
548 | // singleton manager | |
549 | // note that the manager itself is not a singleton, and multiple instances may co-exist | |
550 | // TODO make the pointer atomic in allocation and deallocation to avoid race conditions | |
551 | static Manager* s_manager = nullptr; | |
552 | ||
553 | constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB | |
554 | constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds | |
555 | constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed | |
556 | constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms | |
557 | constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time | |
558 | constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread | |
559 | constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old | |
560 | constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds | |
561 | ||
20effc67 | 562 | bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) { |
f67539c2 TL |
563 | if (s_manager) { |
564 | return false; | |
565 | } | |
566 | // TODO: take conf from CephContext | |
567 | s_manager = new Manager(cct, MAX_QUEUE_SIZE, | |
568 | Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, | |
569 | IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, | |
570 | STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S, | |
571 | WORKER_COUNT, | |
572 | store); | |
573 | return true; | |
574 | } | |
575 | ||
576 | void shutdown() { | |
577 | delete s_manager; | |
578 | s_manager = nullptr; | |
579 | } | |
580 | ||
581 | int add_persistent_topic(const std::string& topic_name, optional_yield y) { | |
582 | if (!s_manager) { | |
583 | return -EAGAIN; | |
584 | } | |
585 | return s_manager->add_persistent_topic(topic_name, y); | |
586 | } | |
587 | ||
aee94f69 TL |
588 | int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) { |
589 | librados::ObjectWriteOperation op; | |
590 | op.remove(); | |
591 | auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y); | |
592 | if (ret == -ENOENT) { | |
593 | // queue already removed - nothing to do | |
594 | ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl; | |
595 | return 0; | |
596 | } | |
597 | if (ret < 0) { | |
598 | // failed to remove queue | |
599 | ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl; | |
600 | return ret; | |
601 | } | |
602 | ||
603 | std::set<std::string> topic_to_remove{{topic_name}}; | |
604 | op.omap_rm_keys(topic_to_remove); | |
605 | ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y); | |
606 | if (ret < 0) { | |
607 | ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl; | |
608 | return ret; | |
609 | } | |
610 | ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl; | |
611 | return 0; | |
612 | } | |
613 | ||
f67539c2 TL |
614 | int remove_persistent_topic(const std::string& topic_name, optional_yield y) { |
615 | if (!s_manager) { | |
616 | return -EAGAIN; | |
617 | } | |
aee94f69 | 618 | return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y); |
f67539c2 TL |
619 | } |
620 | ||
20effc67 TL |
621 | rgw::sal::Object* get_object_with_atttributes( |
622 | const reservation_t& res, rgw::sal::Object* obj) { | |
f67539c2 | 623 | // in case of copy obj, the tags and metadata are taken from source |
20effc67 | 624 | const auto src_obj = res.src_object ? res.src_object : obj; |
f67539c2 TL |
625 | if (src_obj->get_attrs().empty()) { |
626 | if (!src_obj->get_bucket()) { | |
20effc67 | 627 | src_obj->set_bucket(res.bucket); |
f67539c2 | 628 | } |
1e59de90 TL |
629 | const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp); |
630 | if (ret < 0) { | |
631 | ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " << | |
632 | src_obj->get_key() << ". ret = " << ret << dendl; | |
f67539c2 TL |
633 | return nullptr; |
634 | } | |
635 | } | |
636 | return src_obj; | |
637 | } | |
638 | ||
aee94f69 TL |
639 | static inline void filter_amz_meta(meta_map_t& dest, const meta_map_t& src) { |
640 | std::copy_if(src.cbegin(), src.cend(), | |
641 | std::inserter(dest, dest.end()), | |
642 | [](const auto& m) { | |
643 | return (boost::algorithm::starts_with(m.first, RGW_AMZ_META_PREFIX)); | |
644 | }); | |
645 | } | |
646 | ||
647 | ||
20effc67 TL |
648 | static inline void metadata_from_attributes( |
649 | reservation_t& res, rgw::sal::Object* obj) { | |
650 | auto& metadata = res.x_meta_map; | |
651 | const auto src_obj = get_object_with_atttributes(res, obj); | |
f67539c2 TL |
652 | if (!src_obj) { |
653 | return; | |
654 | } | |
1e59de90 | 655 | res.metadata_fetched_from_attributes = true; |
f67539c2 TL |
656 | for (auto& attr : src_obj->get_attrs()) { |
657 | if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) { | |
658 | std::string_view key(attr.first); | |
659 | key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1); | |
660 | // we want to pass a null terminated version | |
661 | // of the bufferlist, hence "to_str().c_str()" | |
662 | metadata.emplace(key, attr.second.to_str().c_str()); | |
663 | } | |
664 | } | |
665 | } | |
666 | ||
20effc67 TL |
667 | static inline void tags_from_attributes( |
668 | const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) { | |
669 | const auto src_obj = get_object_with_atttributes(res, obj); | |
f67539c2 TL |
670 | if (!src_obj) { |
671 | return; | |
672 | } | |
673 | const auto& attrs = src_obj->get_attrs(); | |
674 | const auto attr_iter = attrs.find(RGW_ATTR_TAGS); | |
675 | if (attr_iter != attrs.end()) { | |
676 | auto bliter = attr_iter->second.cbegin(); | |
677 | RGWObjTags obj_tags; | |
678 | try { | |
679 | ::decode(obj_tags, bliter); | |
680 | } catch(buffer::error&) { | |
681 | // not able to decode tags | |
682 | return; | |
683 | } | |
684 | tags = std::move(obj_tags.get_tags()); | |
685 | } | |
686 | } | |
687 | ||
688 | // populate event from request | |
20effc67 TL |
689 | static inline void populate_event(reservation_t& res, |
690 | rgw::sal::Object* obj, | |
9f95a23c | 691 | uint64_t size, |
eafe8130 TL |
692 | const ceph::real_time& mtime, |
693 | const std::string& etag, | |
20effc67 | 694 | const std::string& version, |
eafe8130 | 695 | EventType event_type, |
522d829b | 696 | rgw_pubsub_s3_event& event) { |
f67539c2 | 697 | event.eventTime = mtime; |
a4b75251 | 698 | event.eventName = to_event_string(event_type); |
20effc67 TL |
699 | event.userIdentity = res.user_id; // user that triggered the change |
700 | event.x_amz_request_id = res.req_id; // request ID of the original change | |
701 | event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made | |
9f95a23c | 702 | // configurationId is filled from notification configuration |
20effc67 | 703 | event.bucket_name = res.bucket->get_name(); |
aee94f69 TL |
704 | event.bucket_ownerIdentity = res.bucket->get_owner() ? |
705 | res.bucket->get_owner()->get_id().id : res.bucket->get_info().owner.id; | |
1e59de90 TL |
706 | const auto region = res.store->get_zone()->get_zonegroup().get_api_name(); |
707 | rgw::ARN bucket_arn(res.bucket->get_key()); | |
708 | bucket_arn.region = region; | |
709 | event.bucket_arn = to_string(bucket_arn); | |
522d829b | 710 | event.object_key = res.object_name ? *res.object_name : obj->get_name(); |
f67539c2 TL |
711 | event.object_size = size; |
712 | event.object_etag = etag; | |
20effc67 | 713 | event.object_versionId = version; |
1e59de90 | 714 | event.awsRegion = region; |
eafe8130 TL |
715 | // use timestamp as per key sequence id (hex encoded) |
716 | const utime_t ts(real_clock::now()); | |
717 | boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), | |
f67539c2 TL |
718 | std::back_inserter(event.object_sequencer)); |
719 | set_event_id(event.id, etag, ts); | |
20effc67 TL |
720 | event.bucket_id = res.bucket->get_bucket_id(); |
721 | // pass meta data | |
1e59de90 | 722 | if (!res.metadata_fetched_from_attributes) { |
522d829b | 723 | // either no metadata exist or no metadata filter was used |
20effc67 | 724 | metadata_from_attributes(res, obj); |
f67539c2 | 725 | } |
1e59de90 | 726 | event.x_meta_map = res.x_meta_map; |
9f95a23c | 727 | // pass tags |
20effc67 TL |
728 | if (!res.tagset || |
729 | (*res.tagset).get_tags().empty()) { | |
f67539c2 | 730 | // try to fetch the tags from the attributes |
20effc67 | 731 | tags_from_attributes(res, obj, event.tags); |
f67539c2 | 732 | } else { |
20effc67 | 733 | event.tags = (*res.tagset).get_tags(); |
f67539c2 | 734 | } |
9f95a23c | 735 | // opaque data will be filled from topic configuration |
eafe8130 TL |
736 | } |
737 | ||
20effc67 TL |
738 | static inline bool notification_match(reservation_t& res, |
739 | const rgw_pubsub_topic_filter& filter, | |
740 | EventType event, | |
741 | const RGWObjTags* req_tags) { | |
742 | if (!match(filter.events, event)) { | |
eafe8130 TL |
743 | return false; |
744 | } | |
522d829b TL |
745 | const auto obj = res.object; |
746 | if (!match(filter.s3_filter.key_filter, | |
747 | res.object_name ? *res.object_name : obj->get_name())) { | |
eafe8130 TL |
748 | return false; |
749 | } | |
f67539c2 TL |
750 | |
751 | if (!filter.s3_filter.metadata_filter.kv.empty()) { | |
752 | // metadata filter exists | |
20effc67 | 753 | if (res.s) { |
aee94f69 | 754 | filter_amz_meta(res.x_meta_map, res.s->info.x_meta_map); |
20effc67 TL |
755 | } |
756 | metadata_from_attributes(res, obj); | |
757 | if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) { | |
522d829b | 758 | return false; |
f67539c2 | 759 | } |
eafe8130 | 760 | } |
f67539c2 TL |
761 | |
762 | if (!filter.s3_filter.tag_filter.kv.empty()) { | |
763 | // tag filter exists | |
764 | if (req_tags) { | |
765 | // tags in the request | |
766 | if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) { | |
767 | return false; | |
768 | } | |
20effc67 | 769 | } else if (res.tagset && !(*res.tagset).get_tags().empty()) { |
f67539c2 | 770 | // tags were cached in req_state |
20effc67 | 771 | if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) { |
f67539c2 TL |
772 | return false; |
773 | } | |
774 | } else { | |
775 | // try to fetch tags from the attributes | |
20effc67 TL |
776 | KeyMultiValueMap tags; |
777 | tags_from_attributes(res, obj, tags); | |
f67539c2 TL |
778 | if (!match(filter.s3_filter.tag_filter, tags)) { |
779 | return false; | |
780 | } | |
781 | } | |
9f95a23c | 782 | } |
f67539c2 | 783 | |
eafe8130 TL |
784 | return true; |
785 | } | |
786 | ||
20effc67 TL |
787 | int publish_reserve(const DoutPrefixProvider* dpp, |
788 | EventType event_type, | |
789 | reservation_t& res, | |
790 | const RGWObjTags* req_tags) | |
f67539c2 | 791 | { |
1e59de90 TL |
792 | const RGWPubSub ps(res.store, res.user_tenant); |
793 | const RGWPubSub::Bucket ps_bucket(ps, res.bucket); | |
f67539c2 | 794 | rgw_pubsub_bucket_topics bucket_topics; |
1e59de90 | 795 | auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield); |
f67539c2 TL |
796 | if (rc < 0) { |
797 | // failed to fetch bucket topics | |
798 | return rc; | |
799 | } | |
800 | for (const auto& bucket_topic : bucket_topics.topics) { | |
801 | const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; | |
802 | const rgw_pubsub_topic& topic_cfg = topic_filter.topic; | |
522d829b | 803 | if (!notification_match(res, topic_filter, event_type, req_tags)) { |
f67539c2 TL |
804 | // notification does not apply to req_state |
805 | continue; | |
806 | } | |
20effc67 | 807 | ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id << |
f67539c2 | 808 | "' on topic: '" << topic_cfg.dest.arn_topic << |
20effc67 | 809 | "' and bucket: '" << res.bucket->get_name() << |
f67539c2 TL |
810 | "' (unique topic: '" << topic_cfg.name << |
811 | "') apply to event of type: '" << to_string(event_type) << "'" << dendl; | |
812 | ||
813 | cls_2pc_reservation::id_t res_id; | |
814 | if (topic_cfg.dest.persistent) { | |
815 | // TODO: take default reservation size from conf | |
816 | constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K | |
817 | res.size = DEFAULT_RESERVATION; | |
818 | librados::ObjectWriteOperation op; | |
819 | bufferlist obl; | |
820 | int rval; | |
821 | const auto& queue_name = topic_cfg.dest.arn_topic; | |
822 | cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval); | |
20effc67 TL |
823 | auto ret = rgw_rados_operate( |
824 | res.dpp, res.store->getRados()->get_notif_pool_ctx(), | |
825 | queue_name, &op, res.yield, librados::OPERATION_RETURNVEC); | |
f67539c2 | 826 | if (ret < 0) { |
20effc67 TL |
827 | ldpp_dout(res.dpp, 1) << |
828 | "ERROR: failed to reserve notification on queue: " | |
829 | << queue_name << ". error: " << ret << dendl; | |
f67539c2 TL |
830 | // if no space is left in queue we ask client to slow down |
831 | return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret; | |
832 | } | |
833 | ret = cls_2pc_queue_reserve_result(obl, res_id); | |
834 | if (ret < 0) { | |
20effc67 | 835 | ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl; |
f67539c2 TL |
836 | return ret; |
837 | } | |
838 | } | |
839 | res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id); | |
840 | } | |
841 | return 0; | |
842 | } | |
843 | ||
20effc67 TL |
844 | int publish_commit(rgw::sal::Object* obj, |
845 | uint64_t size, | |
846 | const ceph::real_time& mtime, | |
847 | const std::string& etag, | |
848 | const std::string& version, | |
849 | EventType event_type, | |
850 | reservation_t& res, | |
851 | const DoutPrefixProvider* dpp) | |
f67539c2 TL |
852 | { |
853 | for (auto& topic : res.topics) { | |
20effc67 TL |
854 | if (topic.cfg.dest.persistent && |
855 | topic.res_id == cls_2pc_reservation::NO_ID) { | |
f67539c2 TL |
856 | // nothing to commit or already committed/aborted |
857 | continue; | |
858 | } | |
859 | event_entry_t event_entry; | |
20effc67 | 860 | populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event); |
f67539c2 TL |
861 | event_entry.event.configurationId = topic.configurationId; |
862 | event_entry.event.opaque_data = topic.cfg.opaque_data; | |
863 | if (topic.cfg.dest.persistent) { | |
864 | event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint); | |
20effc67 TL |
865 | event_entry.push_endpoint_args = |
866 | std::move(topic.cfg.dest.push_endpoint_args); | |
867 | event_entry.arn_topic = topic.cfg.dest.arn_topic; | |
f67539c2 TL |
868 | bufferlist bl; |
869 | encode(event_entry, bl); | |
870 | const auto& queue_name = topic.cfg.dest.arn_topic; | |
871 | if (bl.length() > res.size) { | |
872 | // try to make a larger reservation, fail only if this is not possible | |
20effc67 TL |
873 | ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() |
874 | << " exceeded reserved size: " << res.size | |
875 | << | |
876 | " . trying to make a larger reservation on queue:" << queue_name | |
877 | << dendl; | |
f67539c2 TL |
878 | // first cancel the existing reservation |
879 | librados::ObjectWriteOperation op; | |
880 | cls_2pc_queue_abort(op, topic.res_id); | |
20effc67 TL |
881 | auto ret = rgw_rados_operate( |
882 | dpp, res.store->getRados()->get_notif_pool_ctx(), | |
883 | topic.cfg.dest.arn_topic, &op, | |
884 | res.yield); | |
f67539c2 | 885 | if (ret < 0) { |
20effc67 TL |
886 | ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " |
887 | << topic.res_id << | |
f67539c2 | 888 | " when trying to make a larger reservation on queue: " << queue_name |
20effc67 | 889 | << ". error: " << ret << dendl; |
f67539c2 TL |
890 | return ret; |
891 | } | |
892 | // now try to make a bigger one | |
20effc67 | 893 | buffer::list obl; |
f67539c2 TL |
894 | int rval; |
895 | cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval); | |
20effc67 TL |
896 | ret = rgw_rados_operate( |
897 | dpp, res.store->getRados()->get_notif_pool_ctx(), | |
898 | queue_name, &op, res.yield, librados::OPERATION_RETURNVEC); | |
f67539c2 | 899 | if (ret < 0) { |
20effc67 TL |
900 | ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " |
901 | << queue_name | |
902 | << ". error: " << ret << dendl; | |
f67539c2 TL |
903 | return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret; |
904 | } | |
905 | ret = cls_2pc_queue_reserve_result(obl, topic.res_id); | |
906 | if (ret < 0) { | |
20effc67 TL |
907 | ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for " |
908 | "extra space. error: " << ret << dendl; | |
f67539c2 TL |
909 | return ret; |
910 | } | |
911 | } | |
20effc67 | 912 | std::vector<buffer::list> bl_data_vec{std::move(bl)}; |
f67539c2 TL |
913 | librados::ObjectWriteOperation op; |
914 | cls_2pc_queue_commit(op, bl_data_vec, topic.res_id); | |
20effc67 TL |
915 | const auto ret = rgw_rados_operate( |
916 | dpp, res.store->getRados()->get_notif_pool_ctx(), | |
917 | queue_name, &op, res.yield); | |
f67539c2 TL |
918 | topic.res_id = cls_2pc_reservation::NO_ID; |
919 | if (ret < 0) { | |
20effc67 TL |
920 | ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " |
921 | << queue_name << ". error: " << ret | |
922 | << dendl; | |
f67539c2 TL |
923 | return ret; |
924 | } | |
925 | } else { | |
926 | try { | |
927 | // TODO add endpoint LRU cache | |
20effc67 TL |
928 | const auto push_endpoint = RGWPubSubEndpoint::create( |
929 | topic.cfg.dest.push_endpoint, | |
930 | topic.cfg.dest.arn_topic, | |
931 | RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp), | |
932 | dpp->get_cct()); | |
933 | ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: " | |
934 | << topic.cfg.dest.push_endpoint << dendl; | |
935 | const auto ret = push_endpoint->send_to_completion_async( | |
936 | dpp->get_cct(), event_entry.event, res.yield); | |
f67539c2 | 937 | if (ret < 0) { |
20effc67 TL |
938 | ldpp_dout(dpp, 1) << "ERROR: push to endpoint " |
939 | << topic.cfg.dest.push_endpoint | |
940 | << " failed. error: " << ret << dendl; | |
f67539c2 TL |
941 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); |
942 | return ret; | |
eafe8130 | 943 | } |
f67539c2 TL |
944 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); |
945 | } catch (const RGWPubSubEndpoint::configuration_error& e) { | |
b3b6e05e | 946 | ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: " |
f67539c2 TL |
947 | << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl; |
948 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
949 | return -EINVAL; | |
950 | } | |
eafe8130 | 951 | } |
f67539c2 TL |
952 | } |
953 | return 0; | |
954 | } | |
eafe8130 | 955 | |
1e59de90 | 956 | int publish_abort(reservation_t& res) { |
f67539c2 | 957 | for (auto& topic : res.topics) { |
20effc67 TL |
958 | if (!topic.cfg.dest.persistent || |
959 | topic.res_id == cls_2pc_reservation::NO_ID) { | |
f67539c2 TL |
960 | // nothing to abort or already committed/aborted |
961 | continue; | |
962 | } | |
963 | const auto& queue_name = topic.cfg.dest.arn_topic; | |
964 | librados::ObjectWriteOperation op; | |
965 | cls_2pc_queue_abort(op, topic.res_id); | |
20effc67 TL |
966 | const auto ret = rgw_rados_operate( |
967 | res.dpp, res.store->getRados()->get_notif_pool_ctx(), | |
968 | queue_name, &op, res.yield); | |
f67539c2 | 969 | if (ret < 0) { |
20effc67 TL |
970 | ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: " |
971 | << topic.res_id << | |
f67539c2 TL |
972 | " from queue: " << queue_name << ". error: " << ret << dendl; |
973 | return ret; | |
974 | } | |
975 | topic.res_id = cls_2pc_reservation::NO_ID; | |
976 | } | |
977 | return 0; | |
978 | } | |
979 | ||
20effc67 TL |
980 | reservation_t::reservation_t(const DoutPrefixProvider* _dpp, |
981 | rgw::sal::RadosStore* _store, | |
1e59de90 | 982 | const req_state* _s, |
20effc67 TL |
983 | rgw::sal::Object* _object, |
984 | rgw::sal::Object* _src_object, | |
1e59de90 TL |
985 | const std::string* _object_name, |
986 | optional_yield y) : | |
987 | dpp(_s), store(_store), s(_s), size(0) /* XXX */, | |
20effc67 TL |
988 | object(_object), src_object(_src_object), bucket(_s->bucket.get()), |
989 | object_name(_object_name), | |
990 | tagset(_s->tagset), | |
1e59de90 | 991 | metadata_fetched_from_attributes(false), |
20effc67 TL |
992 | user_id(_s->user->get_id().id), |
993 | user_tenant(_s->user->get_id().tenant), | |
994 | req_id(_s->req_id), | |
1e59de90 | 995 | yield(y) |
aee94f69 TL |
996 | { |
997 | filter_amz_meta(x_meta_map, _s->info.x_meta_map); | |
998 | } | |
20effc67 TL |
999 | |
1000 | reservation_t::reservation_t(const DoutPrefixProvider* _dpp, | |
1001 | rgw::sal::RadosStore* _store, | |
20effc67 TL |
1002 | rgw::sal::Object* _object, |
1003 | rgw::sal::Object* _src_object, | |
1004 | rgw::sal::Bucket* _bucket, | |
1e59de90 TL |
1005 | const std::string& _user_id, |
1006 | const std::string& _user_tenant, | |
1007 | const std::string& _req_id, | |
20effc67 TL |
1008 | optional_yield y) : |
1009 | dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */, | |
20effc67 TL |
1010 | object(_object), src_object(_src_object), bucket(_bucket), |
1011 | object_name(nullptr), | |
1e59de90 | 1012 | metadata_fetched_from_attributes(false), |
20effc67 TL |
1013 | user_id(_user_id), |
1014 | user_tenant(_user_tenant), | |
1015 | req_id(_req_id), | |
1016 | yield(y) | |
1017 | {} | |
eafe8130 | 1018 | |
20effc67 TL |
1019 | reservation_t::~reservation_t() { |
1020 | publish_abort(*this); | |
eafe8130 TL |
1021 | } |
1022 | ||
20effc67 | 1023 | } // namespace rgw::notify |