]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
6
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
10 #include "rgw_rados.h"
11 #include "common/WorkQueue.h"
12 #include "common/Throttle.h"
13
14 #include <atomic>
15
16 #include "services/svc_sys_obj.h"
17
18 class RGWAsyncRadosRequest : public RefCountedObject {
19 RGWCoroutine *caller;
20 RGWAioCompletionNotifier *notifier;
21
22 int retcode;
23
24 Mutex lock;
25
26 protected:
27 virtual int _send_request() = 0;
28 public:
29 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
30 lock("RGWAsyncRadosRequest::lock") {
31 }
32 ~RGWAsyncRadosRequest() override {
33 if (notifier) {
34 notifier->put();
35 }
36 }
37
38 void send_request() {
39 get();
40 retcode = _send_request();
41 {
42 Mutex::Locker l(lock);
43 if (notifier) {
44 notifier->cb(); // drops its own ref
45 notifier = nullptr;
46 }
47 }
48 put();
49 }
50
51 int get_ret_status() { return retcode; }
52
53 void finish() {
54 {
55 Mutex::Locker l(lock);
56 if (notifier) {
57 // we won't call notifier->cb() to drop its ref, so drop it here
58 notifier->put();
59 notifier = nullptr;
60 }
61 }
62 put();
63 }
64 };
65
66
67 class RGWAsyncRadosProcessor {
68 deque<RGWAsyncRadosRequest *> m_req_queue;
69 std::atomic<bool> going_down = { false };
70 protected:
71 RGWRados *store;
72 ThreadPool m_tp;
73 Throttle req_throttle;
74
75 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
76 RGWAsyncRadosProcessor *processor;
77 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
78 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
79
80 bool _enqueue(RGWAsyncRadosRequest *req) override;
81 void _dequeue(RGWAsyncRadosRequest *req) override {
82 ceph_abort();
83 }
84 bool _empty() override;
85 RGWAsyncRadosRequest *_dequeue() override;
86 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
87 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
88 void _dump_queue();
89 void _clear() override {
90 ceph_assert(processor->m_req_queue.empty());
91 }
92 } req_wq;
93
94 public:
95 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
96 ~RGWAsyncRadosProcessor() {}
97 void start();
98 void stop();
99 void handle_request(RGWAsyncRadosRequest *req);
100 void queue(RGWAsyncRadosRequest *req);
101
102 bool is_going_down() {
103 return going_down;
104 }
105 };
106
107 template <class P>
108 class RGWSimpleWriteOnlyAsyncCR : public RGWSimpleCoroutine {
109 RGWAsyncRadosProcessor *async_rados;
110 RGWRados *store;
111
112 P params;
113
114 class Request : public RGWAsyncRadosRequest {
115 RGWRados *store;
116 P params;
117 protected:
118 int _send_request() override;
119 public:
120 Request(RGWCoroutine *caller,
121 RGWAioCompletionNotifier *cn,
122 RGWRados *store,
123 const P& _params) : RGWAsyncRadosRequest(caller, cn),
124 store(store),
125 params(_params) {}
126 } *req{nullptr};
127
128 public:
129 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor *_async_rados,
130 RGWRados *_store,
131 const P& _params) : RGWSimpleCoroutine(_store->ctx()),
132 async_rados(_async_rados),
133 store(_store),
134 params(_params) {}
135
136 ~RGWSimpleWriteOnlyAsyncCR() override {
137 request_cleanup();
138 }
139 void request_cleanup() override {
140 if (req) {
141 req->finish();
142 req = NULL;
143 }
144 }
145
146 int send_request() override {
147 req = new Request(this,
148 stack->create_completion_notifier(),
149 store,
150 params);
151
152 async_rados->queue(req);
153 return 0;
154 }
155 int request_complete() override {
156 return req->get_ret_status();
157 }
158 };
159
160
161 template <class P, class R>
162 class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
163 RGWAsyncRadosProcessor *async_rados;
164 RGWRados *store;
165
166 P params;
167 std::shared_ptr<R> result;
168
169 class Request : public RGWAsyncRadosRequest {
170 RGWRados *store;
171 P params;
172 std::shared_ptr<R> result;
173 protected:
174 int _send_request() override;
175 public:
176 Request(RGWCoroutine *caller,
177 RGWAioCompletionNotifier *cn,
178 RGWRados *_store,
179 const P& _params,
180 std::shared_ptr<R>& _result) : RGWAsyncRadosRequest(caller, cn),
181 store(_store),
182 params(_params),
183 result(_result) {}
184 } *req{nullptr};
185
186 public:
187 RGWSimpleAsyncCR(RGWAsyncRadosProcessor *_async_rados,
188 RGWRados *_store,
189 const P& _params,
190 std::shared_ptr<R>& _result) : RGWSimpleCoroutine(_store->ctx()),
191 async_rados(_async_rados),
192 store(_store),
193 params(_params),
194 result(_result) {}
195
196 ~RGWSimpleAsyncCR() override {
197 request_cleanup();
198 }
199 void request_cleanup() override {
200 if (req) {
201 req->finish();
202 req = NULL;
203 }
204 }
205
206 int send_request() override {
207 req = new Request(this,
208 stack->create_completion_notifier(),
209 store,
210 params,
211 result);
212
213 async_rados->queue(req);
214 return 0;
215 }
216 int request_complete() override {
217 return req->get_ret_status();
218 }
219 };
220
221
222 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
223 RGWSysObjectCtx obj_ctx;
224 RGWObjVersionTracker objv_tracker;
225 rgw_raw_obj obj;
226 const bool want_attrs;
227 const bool raw_attrs;
228 protected:
229 int _send_request() override;
230 public:
231 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
232 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
233 bool want_attrs, bool raw_attrs);
234
235 bufferlist bl;
236 map<string, bufferlist> attrs;
237 };
238
239 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
240 RGWSI_SysObj *svc;
241 rgw_raw_obj obj;
242 bool exclusive;
243 bufferlist bl;
244
245 protected:
246 int _send_request() override;
247 public:
248 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
249 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
250 bool _exclusive, bufferlist _bl);
251
252 RGWObjVersionTracker objv_tracker;
253 };
254
255 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
256 RGWSI_SysObj *svc;
257 rgw_raw_obj obj;
258 map<string, bufferlist> attrs;
259
260 protected:
261 int _send_request() override;
262 public:
263 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
264 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
265 map<string, bufferlist> _attrs);
266
267 RGWObjVersionTracker objv_tracker;
268 };
269
270 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
271 RGWRados *store;
272 rgw_raw_obj obj;
273 string lock_name;
274 string cookie;
275 uint32_t duration_secs;
276
277 protected:
278 int _send_request() override;
279 public:
280 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
281 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
282 const string& _name, const string& _cookie, uint32_t _duration_secs);
283 };
284
285 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
286 RGWRados *store;
287 rgw_raw_obj obj;
288 string lock_name;
289 string cookie;
290
291 protected:
292 int _send_request() override;
293 public:
294 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
295 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
296 const string& _name, const string& _cookie);
297 };
298
299 template <class T>
300 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
301 RGWAsyncRadosProcessor *async_rados;
302 RGWSI_SysObj *svc;
303
304 rgw_raw_obj obj;
305 T *result;
306 /// on ENOENT, call handle_data() with an empty object instead of failing
307 const bool empty_on_enoent;
308 RGWObjVersionTracker *objv_tracker;
309 RGWAsyncGetSystemObj *req{nullptr};
310
311 public:
312 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
313 const rgw_raw_obj& _obj,
314 T *_result, bool empty_on_enoent = true,
315 RGWObjVersionTracker *objv_tracker = nullptr)
316 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados), svc(_svc),
317 obj(_obj), result(_result),
318 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
319 ~RGWSimpleRadosReadCR() override {
320 request_cleanup();
321 }
322
323 void request_cleanup() override {
324 if (req) {
325 req->finish();
326 req = NULL;
327 }
328 }
329
330 int send_request() override;
331 int request_complete() override;
332
333 virtual int handle_data(T& data) {
334 return 0;
335 }
336 };
337
338 template <class T>
339 int RGWSimpleRadosReadCR<T>::send_request()
340 {
341 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), svc,
342 objv_tracker, obj, false, false);
343 async_rados->queue(req);
344 return 0;
345 }
346
347 template <class T>
348 int RGWSimpleRadosReadCR<T>::request_complete()
349 {
350 int ret = req->get_ret_status();
351 retcode = ret;
352 if (ret == -ENOENT && empty_on_enoent) {
353 *result = T();
354 } else {
355 if (ret < 0) {
356 return ret;
357 }
358 try {
359 auto iter = req->bl.cbegin();
360 if (iter.end()) {
361 // allow successful reads with empty buffers. ReadSyncStatus coroutines
362 // depend on this to be able to read without locking, because the
363 // cls lock from InitSyncStatus will create an empty object if it didn't
364 // exist
365 *result = T();
366 } else {
367 decode(*result, iter);
368 }
369 } catch (buffer::error& err) {
370 return -EIO;
371 }
372 }
373
374 return handle_data(*result);
375 }
376
377 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
378 RGWAsyncRadosProcessor *async_rados;
379 RGWSI_SysObj *svc;
380
381 rgw_raw_obj obj;
382 map<string, bufferlist> *pattrs;
383 bool raw_attrs;
384 RGWAsyncGetSystemObj *req;
385
386 public:
387 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
388 const rgw_raw_obj& _obj,
389 map<string, bufferlist> *_pattrs, bool _raw_attrs) : RGWSimpleCoroutine(_svc->ctx()),
390 async_rados(_async_rados), svc(_svc),
391 obj(_obj),
392 pattrs(_pattrs),
393 raw_attrs(_raw_attrs),
394 req(NULL) {}
395 ~RGWSimpleRadosReadAttrsCR() override {
396 request_cleanup();
397 }
398
399 void request_cleanup() override {
400 if (req) {
401 req->finish();
402 req = NULL;
403 }
404 }
405
406 int send_request() override;
407 int request_complete() override;
408 };
409
410 template <class T>
411 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
412 RGWAsyncRadosProcessor *async_rados;
413 RGWSI_SysObj *svc;
414 bufferlist bl;
415 rgw_raw_obj obj;
416 RGWObjVersionTracker *objv_tracker;
417 RGWAsyncPutSystemObj *req{nullptr};
418
419 public:
420 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
421 const rgw_raw_obj& _obj,
422 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
423 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
424 svc(_svc), obj(_obj), objv_tracker(objv_tracker) {
425 encode(_data, bl);
426 }
427
428 ~RGWSimpleRadosWriteCR() override {
429 request_cleanup();
430 }
431
432 void request_cleanup() override {
433 if (req) {
434 req->finish();
435 req = NULL;
436 }
437 }
438
439 int send_request() override {
440 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
441 svc, objv_tracker, obj, false, std::move(bl));
442 async_rados->queue(req);
443 return 0;
444 }
445
446 int request_complete() override {
447 if (objv_tracker) { // copy the updated version
448 *objv_tracker = req->objv_tracker;
449 }
450 return req->get_ret_status();
451 }
452 };
453
454 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
455 RGWAsyncRadosProcessor *async_rados;
456 RGWSI_SysObj *svc;
457 RGWObjVersionTracker *objv_tracker;
458
459 rgw_raw_obj obj;
460 map<string, bufferlist> attrs;
461 RGWAsyncPutSystemObjAttrs *req = nullptr;
462
463 public:
464 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados,
465 RGWSI_SysObj *_svc, const rgw_raw_obj& _obj,
466 map<string, bufferlist> _attrs,
467 RGWObjVersionTracker *objv_tracker = nullptr)
468 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
469 svc(_svc), objv_tracker(objv_tracker), obj(_obj),
470 attrs(std::move(_attrs)) {
471 }
472 ~RGWSimpleRadosWriteAttrsCR() override {
473 request_cleanup();
474 }
475
476 void request_cleanup() override {
477 if (req) {
478 req->finish();
479 req = NULL;
480 }
481 }
482
483 int send_request() override {
484 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
485 svc, objv_tracker, obj, std::move(attrs));
486 async_rados->queue(req);
487 return 0;
488 }
489
490 int request_complete() override {
491 if (objv_tracker) { // copy the updated version
492 *objv_tracker = req->objv_tracker;
493 }
494 return req->get_ret_status();
495 }
496 };
497
498 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
499 RGWRados *store;
500 map<string, bufferlist> entries;
501
502 rgw_rados_ref ref;
503
504 rgw_raw_obj obj;
505
506 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
507
508 public:
509 RGWRadosSetOmapKeysCR(RGWRados *_store,
510 const rgw_raw_obj& _obj,
511 map<string, bufferlist>& _entries);
512
513 int send_request() override;
514 int request_complete() override;
515 };
516
517 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
518 public:
519 struct Result {
520 rgw_rados_ref ref;
521 std::set<std::string> entries;
522 bool more = false;
523 };
524 using ResultPtr = std::shared_ptr<Result>;
525
526 RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj,
527 const string& _marker, int _max_entries,
528 ResultPtr result);
529
530 int send_request() override;
531 int request_complete() override;
532
533 private:
534 RGWRados *store;
535 rgw_raw_obj obj;
536 string marker;
537 int max_entries;
538 ResultPtr result;
539 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
540 };
541
542 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
543 RGWRados *store;
544
545 rgw_rados_ref ref;
546
547 set<string> keys;
548
549 rgw_raw_obj obj;
550
551 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
552
553 public:
554 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
555 const rgw_raw_obj& _obj,
556 const set<string>& _keys);
557
558 int send_request() override;
559
560 int request_complete() override;
561 };
562
563 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
564 RGWRados *store;
565 librados::IoCtx ioctx;
566 const rgw_raw_obj obj;
567 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
568
569 public:
570 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
571
572 int send_request() override;
573 int request_complete() override;
574 };
575
576 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
577 RGWAsyncRadosProcessor *async_rados;
578 RGWRados *store;
579 string lock_name;
580 string cookie;
581 uint32_t duration;
582
583 rgw_raw_obj obj;
584
585 RGWAsyncLockSystemObj *req;
586
587 public:
588 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
589 const rgw_raw_obj& _obj,
590 const string& _lock_name,
591 const string& _cookie,
592 uint32_t _duration);
593 ~RGWSimpleRadosLockCR() override {
594 request_cleanup();
595 }
596 void request_cleanup() override;
597
598 int send_request() override;
599 int request_complete() override;
600
601 static std::string gen_random_cookie(CephContext* cct) {
602 #define COOKIE_LEN 16
603 char buf[COOKIE_LEN + 1];
604 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
605 return buf;
606 }
607 };
608
609 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
610 RGWAsyncRadosProcessor *async_rados;
611 RGWRados *store;
612 string lock_name;
613 string cookie;
614
615 rgw_raw_obj obj;
616
617 RGWAsyncUnlockSystemObj *req;
618
619 public:
620 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
621 const rgw_raw_obj& _obj,
622 const string& _lock_name,
623 const string& _cookie);
624 ~RGWSimpleRadosUnlockCR() override {
625 request_cleanup();
626 }
627 void request_cleanup() override;
628
629 int send_request() override;
630 int request_complete() override;
631 };
632
633 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
634
635 class RGWOmapAppend : public RGWConsumerCR<string> {
636 RGWAsyncRadosProcessor *async_rados;
637 RGWRados *store;
638
639 rgw_raw_obj obj;
640
641 bool going_down;
642
643 int num_pending_entries;
644 list<string> pending_entries;
645
646 map<string, bufferlist> entries;
647
648 uint64_t window_size;
649 uint64_t total_entries;
650 public:
651 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
652 const rgw_raw_obj& _obj,
653 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
654 int operate() override;
655 void flush_pending();
656 bool append(const string& s);
657 bool finish();
658
659 uint64_t get_total_entries() {
660 return total_entries;
661 }
662
663 const rgw_raw_obj& get_obj() {
664 return obj;
665 }
666 };
667
668 class RGWAsyncWait : public RGWAsyncRadosRequest {
669 CephContext *cct;
670 Mutex *lock;
671 Cond *cond;
672 utime_t interval;
673 protected:
674 int _send_request() override {
675 Mutex::Locker l(*lock);
676 return cond->WaitInterval(*lock, interval);
677 }
678 public:
679 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
680 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
681 cct(_cct),
682 lock(_lock), cond(_cond), interval(_secs, 0) {}
683
684 void wakeup() {
685 Mutex::Locker l(*lock);
686 cond->Signal();
687 }
688 };
689
690 class RGWWaitCR : public RGWSimpleCoroutine {
691 CephContext *cct;
692 RGWAsyncRadosProcessor *async_rados;
693 Mutex *lock;
694 Cond *cond;
695 int secs;
696
697 RGWAsyncWait *req;
698
699 public:
700 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
701 Mutex *_lock, Cond *_cond,
702 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
703 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
704 }
705 ~RGWWaitCR() override {
706 request_cleanup();
707 }
708
709 void request_cleanup() override {
710 if (req) {
711 wakeup();
712 req->finish();
713 req = NULL;
714 }
715 }
716
717 int send_request() override {
718 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
719 async_rados->queue(req);
720 return 0;
721 }
722
723 int request_complete() override {
724 return req->get_ret_status();
725 }
726
727 void wakeup() {
728 req->wakeup();
729 }
730 };
731
732 class RGWShardedOmapCRManager {
733 RGWAsyncRadosProcessor *async_rados;
734 RGWRados *store;
735 RGWCoroutine *op;
736
737 int num_shards;
738
739 vector<RGWOmapAppend *> shards;
740 public:
741 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
742 : async_rados(_async_rados),
743 store(_store), op(_op), num_shards(_num_shards) {
744 shards.reserve(num_shards);
745 for (int i = 0; i < num_shards; ++i) {
746 char buf[oid_prefix.size() + 16];
747 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
748 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
749 shard->get();
750 shards.push_back(shard);
751 op->spawn(shard, false);
752 }
753 }
754
755 ~RGWShardedOmapCRManager() {
756 for (auto shard : shards) {
757 shard->put();
758 }
759 }
760
761 bool append(const string& entry, int shard_id) {
762 return shards[shard_id]->append(entry);
763 }
764 bool finish() {
765 bool success = true;
766 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
767 success &= ((*iter)->finish() && (!(*iter)->is_error()));
768 }
769 return success;
770 }
771
772 uint64_t get_total_entries(int shard_id) {
773 return shards[shard_id]->get_total_entries();
774 }
775 };
776
777 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
778 RGWRados *store;
779 const std::string oid;
780
781 protected:
782 int _send_request() override;
783 public:
784 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
785 RGWRados *_store, const std::string& oid)
786 : RGWAsyncRadosRequest(caller, cn), store(_store), oid(oid) {}
787
788 RGWBucketInfo bucket_info;
789 };
790
791 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
792 RGWAsyncRadosProcessor *async_rados;
793 RGWRados *store;
794 const std::string oid;
795 RGWBucketInfo *bucket_info;
796
797 RGWAsyncGetBucketInstanceInfo *req{nullptr};
798
799 public:
800 // metadata key constructor
801 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
802 const std::string& meta_key, RGWBucketInfo *_bucket_info)
803 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
804 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
805 bucket_info(_bucket_info) {}
806 // rgw_bucket constructor
807 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
808 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
809 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
810 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
811 bucket_info(_bucket_info) {}
812 ~RGWGetBucketInstanceInfoCR() override {
813 request_cleanup();
814 }
815 void request_cleanup() override {
816 if (req) {
817 req->finish();
818 req = NULL;
819 }
820 }
821
822 int send_request() override {
823 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid);
824 async_rados->queue(req);
825 return 0;
826 }
827 int request_complete() override {
828 if (bucket_info) {
829 *bucket_info = std::move(req->bucket_info);
830 }
831 return req->get_ret_status();
832 }
833 };
834
835 class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
836 RGWRados::BucketShard bs;
837 std::string start_marker;
838 std::string end_marker;
839 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
840 public:
841 RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
842 int shard_id, const std::string& start_marker,
843 const std::string& end_marker);
844
845 int send_request() override;
846 int request_complete() override;
847 };
848
849 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
850 RGWRados *store;
851 string source_zone;
852
853 RGWBucketInfo bucket_info;
854 std::optional<rgw_placement_rule> dest_placement_rule;
855
856 rgw_obj_key key;
857 std::optional<rgw_obj_key> dest_key;
858 std::optional<uint64_t> versioned_epoch;
859
860 real_time src_mtime;
861
862 bool copy_if_newer;
863 rgw_zone_set zones_trace;
864 PerfCounters* counters;
865
866 protected:
867 int _send_request() override;
868 public:
869 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
870 const string& _source_zone,
871 RGWBucketInfo& _bucket_info,
872 std::optional<rgw_placement_rule> _dest_placement_rule,
873 const rgw_obj_key& _key,
874 const std::optional<rgw_obj_key>& _dest_key,
875 std::optional<uint64_t> _versioned_epoch,
876 bool _if_newer, rgw_zone_set *_zones_trace,
877 PerfCounters* counters)
878 : RGWAsyncRadosRequest(caller, cn), store(_store),
879 source_zone(_source_zone),
880 bucket_info(_bucket_info),
881 dest_placement_rule(_dest_placement_rule),
882 key(_key),
883 dest_key(_dest_key),
884 versioned_epoch(_versioned_epoch),
885 copy_if_newer(_if_newer), counters(counters)
886 {
887 if (_zones_trace) {
888 zones_trace = *_zones_trace;
889 }
890 }
891 };
892
893 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
894 CephContext *cct;
895 RGWAsyncRadosProcessor *async_rados;
896 RGWRados *store;
897 string source_zone;
898
899 RGWBucketInfo bucket_info;
900 std::optional<rgw_placement_rule> dest_placement_rule;
901
902 rgw_obj_key key;
903 std::optional<rgw_obj_key> dest_key;
904 std::optional<uint64_t> versioned_epoch;
905
906 real_time src_mtime;
907
908 bool copy_if_newer;
909
910 RGWAsyncFetchRemoteObj *req;
911 rgw_zone_set *zones_trace;
912 PerfCounters* counters;
913
914 public:
915 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
916 const string& _source_zone,
917 RGWBucketInfo& _bucket_info,
918 std::optional<rgw_placement_rule> _dest_placement_rule,
919 const rgw_obj_key& _key,
920 const std::optional<rgw_obj_key>& _dest_key,
921 std::optional<uint64_t> _versioned_epoch,
922 bool _if_newer, rgw_zone_set *_zones_trace,
923 PerfCounters* counters)
924 : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
925 async_rados(_async_rados), store(_store),
926 source_zone(_source_zone),
927 bucket_info(_bucket_info),
928 dest_placement_rule(_dest_placement_rule),
929 key(_key),
930 dest_key(_dest_key),
931 versioned_epoch(_versioned_epoch),
932 copy_if_newer(_if_newer), req(NULL),
933 zones_trace(_zones_trace), counters(counters) {}
934
935
936 ~RGWFetchRemoteObjCR() override {
937 request_cleanup();
938 }
939
940 void request_cleanup() override {
941 if (req) {
942 req->finish();
943 req = NULL;
944 }
945 }
946
947 int send_request() override {
948 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
949 source_zone, bucket_info, dest_placement_rule,
950 key, dest_key, versioned_epoch, copy_if_newer,
951 zones_trace, counters);
952 async_rados->queue(req);
953 return 0;
954 }
955
956 int request_complete() override {
957 return req->get_ret_status();
958 }
959 };
960
961 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
962 RGWRados *store;
963 string source_zone;
964
965 RGWBucketInfo bucket_info;
966
967 rgw_obj_key key;
968
969 ceph::real_time *pmtime;
970 uint64_t *psize;
971 string *petag;
972 map<string, bufferlist> *pattrs;
973 map<string, string> *pheaders;
974
975 protected:
976 int _send_request() override;
977 public:
978 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
979 const string& _source_zone,
980 RGWBucketInfo& _bucket_info,
981 const rgw_obj_key& _key,
982 ceph::real_time *_pmtime,
983 uint64_t *_psize,
984 string *_petag,
985 map<string, bufferlist> *_pattrs,
986 map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
987 source_zone(_source_zone),
988 bucket_info(_bucket_info),
989 key(_key),
990 pmtime(_pmtime),
991 psize(_psize),
992 petag(_petag),
993 pattrs(_pattrs),
994 pheaders(_pheaders) {}
995 };
996
997 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
998 CephContext *cct;
999 RGWAsyncRadosProcessor *async_rados;
1000 RGWRados *store;
1001 string source_zone;
1002
1003 RGWBucketInfo bucket_info;
1004
1005 rgw_obj_key key;
1006
1007 ceph::real_time *pmtime;
1008 uint64_t *psize;
1009 string *petag;
1010 map<string, bufferlist> *pattrs;
1011 map<string, string> *pheaders;
1012
1013 RGWAsyncStatRemoteObj *req;
1014
1015 public:
1016 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1017 const string& _source_zone,
1018 RGWBucketInfo& _bucket_info,
1019 const rgw_obj_key& _key,
1020 ceph::real_time *_pmtime,
1021 uint64_t *_psize,
1022 string *_petag,
1023 map<string, bufferlist> *_pattrs,
1024 map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1025 async_rados(_async_rados), store(_store),
1026 source_zone(_source_zone),
1027 bucket_info(_bucket_info),
1028 key(_key),
1029 pmtime(_pmtime),
1030 psize(_psize),
1031 petag(_petag),
1032 pattrs(_pattrs),
1033 pheaders(_pheaders),
1034 req(NULL) {}
1035
1036
1037 ~RGWStatRemoteObjCR() override {
1038 request_cleanup();
1039 }
1040
1041 void request_cleanup() override {
1042 if (req) {
1043 req->finish();
1044 req = NULL;
1045 }
1046 }
1047
1048 int send_request() override {
1049 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
1050 bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
1051 async_rados->queue(req);
1052 return 0;
1053 }
1054
1055 int request_complete() override {
1056 return req->get_ret_status();
1057 }
1058 };
1059
1060 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
1061 RGWRados *store;
1062 string source_zone;
1063
1064 RGWBucketInfo bucket_info;
1065
1066 rgw_obj_key key;
1067 string owner;
1068 string owner_display_name;
1069 bool versioned;
1070 uint64_t versioned_epoch;
1071 string marker_version_id;
1072
1073 bool del_if_older;
1074 ceph::real_time timestamp;
1075 rgw_zone_set zones_trace;
1076
1077 protected:
1078 int _send_request() override;
1079 public:
1080 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1081 const string& _source_zone,
1082 RGWBucketInfo& _bucket_info,
1083 const rgw_obj_key& _key,
1084 const string& _owner,
1085 const string& _owner_display_name,
1086 bool _versioned,
1087 uint64_t _versioned_epoch,
1088 bool _delete_marker,
1089 bool _if_older,
1090 real_time& _timestamp,
1091 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
1092 source_zone(_source_zone),
1093 bucket_info(_bucket_info),
1094 key(_key),
1095 owner(_owner),
1096 owner_display_name(_owner_display_name),
1097 versioned(_versioned),
1098 versioned_epoch(_versioned_epoch),
1099 del_if_older(_if_older),
1100 timestamp(_timestamp) {
1101 if (_delete_marker) {
1102 marker_version_id = key.instance;
1103 }
1104
1105 if (_zones_trace) {
1106 zones_trace = *_zones_trace;
1107 }
1108 }
1109 };
1110
1111 class RGWRemoveObjCR : public RGWSimpleCoroutine {
1112 CephContext *cct;
1113 RGWAsyncRadosProcessor *async_rados;
1114 RGWRados *store;
1115 string source_zone;
1116
1117 RGWBucketInfo bucket_info;
1118
1119 rgw_obj_key key;
1120 bool versioned;
1121 uint64_t versioned_epoch;
1122 bool delete_marker;
1123 string owner;
1124 string owner_display_name;
1125
1126 bool del_if_older;
1127 real_time timestamp;
1128
1129 RGWAsyncRemoveObj *req;
1130
1131 rgw_zone_set *zones_trace;
1132
1133 public:
1134 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1135 const string& _source_zone,
1136 RGWBucketInfo& _bucket_info,
1137 const rgw_obj_key& _key,
1138 bool _versioned,
1139 uint64_t _versioned_epoch,
1140 string *_owner,
1141 string *_owner_display_name,
1142 bool _delete_marker,
1143 real_time *_timestamp,
1144 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1145 async_rados(_async_rados), store(_store),
1146 source_zone(_source_zone),
1147 bucket_info(_bucket_info),
1148 key(_key),
1149 versioned(_versioned),
1150 versioned_epoch(_versioned_epoch),
1151 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
1152 del_if_older = (_timestamp != NULL);
1153 if (_timestamp) {
1154 timestamp = *_timestamp;
1155 }
1156
1157 if (_owner) {
1158 owner = *_owner;
1159 }
1160
1161 if (_owner_display_name) {
1162 owner_display_name = *_owner_display_name;
1163 }
1164 }
1165 ~RGWRemoveObjCR() override {
1166 request_cleanup();
1167 }
1168
1169 void request_cleanup() override {
1170 if (req) {
1171 req->finish();
1172 req = NULL;
1173 }
1174 }
1175
1176 int send_request() override {
1177 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1178 key, owner, owner_display_name, versioned, versioned_epoch,
1179 delete_marker, del_if_older, timestamp, zones_trace);
1180 async_rados->queue(req);
1181 return 0;
1182 }
1183
1184 int request_complete() override {
1185 return req->get_ret_status();
1186 }
1187 };
1188
1189 class RGWContinuousLeaseCR : public RGWCoroutine {
1190 RGWAsyncRadosProcessor *async_rados;
1191 RGWRados *store;
1192
1193 const rgw_raw_obj obj;
1194
1195 const string lock_name;
1196 const string cookie;
1197
1198 int interval;
1199
1200 Mutex lock;
1201 std::atomic<bool> going_down = { false };
1202 bool locked{false};
1203
1204 RGWCoroutine *caller;
1205
1206 bool aborted{false};
1207
1208 public:
1209 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1210 const rgw_raw_obj& _obj,
1211 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1212 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1213 obj(_obj), lock_name(_lock_name),
1214 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1215 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1216 {}
1217
1218 int operate() override;
1219
1220 bool is_locked() {
1221 Mutex::Locker l(lock);
1222 return locked;
1223 }
1224
1225 void set_locked(bool status) {
1226 Mutex::Locker l(lock);
1227 locked = status;
1228 }
1229
1230 void go_down() {
1231 going_down = true;
1232 wakeup();
1233 }
1234
1235 void abort() {
1236 aborted = true;
1237 }
1238 };
1239
1240 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1241 RGWRados *store;
1242 list<cls_log_entry> entries;
1243
1244 string oid;
1245
1246 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1247
1248 public:
1249 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1250 const cls_log_entry& entry);
1251
1252 int send_request() override;
1253 int request_complete() override;
1254 };
1255
1256 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1257 RGWRados *store;
1258 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1259 protected:
1260 std::string oid;
1261 real_time start_time;
1262 real_time end_time;
1263 std::string from_marker;
1264 std::string to_marker;
1265
1266 public:
1267 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1268 const real_time& start_time, const real_time& end_time,
1269 const std::string& from_marker,
1270 const std::string& to_marker);
1271
1272 int send_request() override;
1273 int request_complete() override;
1274 };
1275
1276 // wrapper to update last_trim_marker on success
1277 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1278 CephContext *cct;
1279 std::string *last_trim_marker;
1280 public:
1281 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1282 const std::string& to_marker, std::string *last_trim_marker);
1283 int request_complete() override;
1284 };
1285
1286 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1287 RGWRados *store;
1288 RGWBucketInfo bucket_info;
1289 rgw_obj obj;
1290 uint64_t *psize;
1291 real_time *pmtime;
1292 uint64_t *pepoch;
1293 RGWObjVersionTracker *objv_tracker;
1294 protected:
1295 int _send_request() override;
1296 public:
1297 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1298 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1299 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1300 RGWObjVersionTracker *objv_tracker = nullptr)
1301 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1302 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1303 };
1304
1305 class RGWStatObjCR : public RGWSimpleCoroutine {
1306 RGWRados *store;
1307 RGWAsyncRadosProcessor *async_rados;
1308 RGWBucketInfo bucket_info;
1309 rgw_obj obj;
1310 uint64_t *psize;
1311 real_time *pmtime;
1312 uint64_t *pepoch;
1313 RGWObjVersionTracker *objv_tracker;
1314 RGWAsyncStatObj *req = nullptr;
1315 public:
1316 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1317 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1318 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1319 RGWObjVersionTracker *objv_tracker = nullptr);
1320 ~RGWStatObjCR() override {
1321 request_cleanup();
1322 }
1323 void request_cleanup() override;
1324
1325 int send_request() override;
1326 int request_complete() override;
1327 };
1328
1329 /// coroutine wrapper for IoCtx::aio_notify()
1330 class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1331 RGWRados *const store;
1332 const rgw_raw_obj obj;
1333 bufferlist request;
1334 const uint64_t timeout_ms;
1335 bufferlist *response;
1336 rgw_rados_ref ref;
1337 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1338
1339 public:
1340 RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
1341 bufferlist& request, uint64_t timeout_ms,
1342 bufferlist *response);
1343
1344 int send_request() override;
1345 int request_complete() override;
1346 };
1347
1348 #endif