]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
6
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
10 #include "rgw_rados.h"
11 #include "common/WorkQueue.h"
12 #include "common/Throttle.h"
13
14 #include <atomic>
15
16 #include "services/svc_sys_obj.h"
17
18 class RGWAsyncRadosRequest : public RefCountedObject {
19 RGWCoroutine *caller;
20 RGWAioCompletionNotifier *notifier;
21
22 int retcode;
23
24 Mutex lock;
25
26 protected:
27 virtual int _send_request() = 0;
28 public:
29 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
30 lock("RGWAsyncRadosRequest::lock") {
31 }
32 ~RGWAsyncRadosRequest() override {
33 if (notifier) {
34 notifier->put();
35 }
36 }
37
38 void send_request() {
39 get();
40 retcode = _send_request();
41 {
42 Mutex::Locker l(lock);
43 if (notifier) {
44 notifier->cb(); // drops its own ref
45 notifier = nullptr;
46 }
47 }
48 put();
49 }
50
51 int get_ret_status() { return retcode; }
52
53 void finish() {
54 {
55 Mutex::Locker l(lock);
56 if (notifier) {
57 // we won't call notifier->cb() to drop its ref, so drop it here
58 notifier->put();
59 notifier = nullptr;
60 }
61 }
62 put();
63 }
64 };
65
66
67 class RGWAsyncRadosProcessor {
68 deque<RGWAsyncRadosRequest *> m_req_queue;
69 std::atomic<bool> going_down = { false };
70 protected:
71 RGWRados *store;
72 ThreadPool m_tp;
73 Throttle req_throttle;
74
75 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
76 RGWAsyncRadosProcessor *processor;
77 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
78 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
79
80 bool _enqueue(RGWAsyncRadosRequest *req) override;
81 void _dequeue(RGWAsyncRadosRequest *req) override {
82 ceph_abort();
83 }
84 bool _empty() override;
85 RGWAsyncRadosRequest *_dequeue() override;
86 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
87 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
88 void _dump_queue();
89 void _clear() override {
90 ceph_assert(processor->m_req_queue.empty());
91 }
92 } req_wq;
93
94 public:
95 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
96 ~RGWAsyncRadosProcessor() {}
97 void start();
98 void stop();
99 void handle_request(RGWAsyncRadosRequest *req);
100 void queue(RGWAsyncRadosRequest *req);
101
102 bool is_going_down() {
103 return going_down;
104 }
105 };
106
107 template <class P>
108 class RGWSimpleWriteOnlyAsyncCR : public RGWSimpleCoroutine {
109 RGWAsyncRadosProcessor *async_rados;
110 RGWRados *store;
111
112 P params;
113
114 class Request : public RGWAsyncRadosRequest {
115 RGWRados *store;
116 P params;
117 protected:
118 int _send_request() override;
119 public:
120 Request(RGWCoroutine *caller,
121 RGWAioCompletionNotifier *cn,
122 RGWRados *store,
123 const P& _params) : RGWAsyncRadosRequest(caller, cn),
124 store(store),
125 params(_params) {}
126 } *req{nullptr};
127
128 public:
129 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor *_async_rados,
130 RGWRados *_store,
131 const P& _params) : RGWSimpleCoroutine(_store->ctx()),
132 async_rados(_async_rados),
133 store(_store),
134 params(_params) {}
135
136 ~RGWSimpleWriteOnlyAsyncCR() override {
137 request_cleanup();
138 }
139 void request_cleanup() override {
140 if (req) {
141 req->finish();
142 req = NULL;
143 }
144 }
145
146 int send_request() override {
147 req = new Request(this,
148 stack->create_completion_notifier(),
149 store,
150 params);
151
152 async_rados->queue(req);
153 return 0;
154 }
155 int request_complete() override {
156 return req->get_ret_status();
157 }
158 };
159
160
161 template <class P, class R>
162 class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
163 RGWAsyncRadosProcessor *async_rados;
164 RGWRados *store;
165
166 P params;
167 std::shared_ptr<R> result;
168
169 class Request : public RGWAsyncRadosRequest {
170 RGWRados *store;
171 P params;
172 std::shared_ptr<R> result;
173 protected:
174 int _send_request() override;
175 public:
176 Request(RGWCoroutine *caller,
177 RGWAioCompletionNotifier *cn,
178 RGWRados *_store,
179 const P& _params,
180 std::shared_ptr<R>& _result) : RGWAsyncRadosRequest(caller, cn),
181 store(_store),
182 params(_params),
183 result(_result) {}
184 } *req{nullptr};
185
186 public:
187 RGWSimpleAsyncCR(RGWAsyncRadosProcessor *_async_rados,
188 RGWRados *_store,
189 const P& _params,
190 std::shared_ptr<R>& _result) : RGWSimpleCoroutine(_store->ctx()),
191 async_rados(_async_rados),
192 store(_store),
193 params(_params),
194 result(_result) {}
195
196 ~RGWSimpleAsyncCR() override {
197 request_cleanup();
198 }
199 void request_cleanup() override {
200 if (req) {
201 req->finish();
202 req = NULL;
203 }
204 }
205
206 int send_request() override {
207 req = new Request(this,
208 stack->create_completion_notifier(),
209 store,
210 params,
211 result);
212
213 async_rados->queue(req);
214 return 0;
215 }
216 int request_complete() override {
217 return req->get_ret_status();
218 }
219 };
220
221
222 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
223 RGWSysObjectCtx obj_ctx;
224 RGWObjVersionTracker objv_tracker;
225 rgw_raw_obj obj;
226 const bool want_attrs;
227 const bool raw_attrs;
228 protected:
229 int _send_request() override;
230 public:
231 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
232 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
233 bool want_attrs, bool raw_attrs);
234
235 bufferlist bl;
236 map<string, bufferlist> attrs;
237 };
238
239 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
240 RGWSI_SysObj *svc;
241 rgw_raw_obj obj;
242 bool exclusive;
243 bufferlist bl;
244
245 protected:
246 int _send_request() override;
247 public:
248 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
249 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
250 bool _exclusive, bufferlist _bl);
251
252 RGWObjVersionTracker objv_tracker;
253 };
254
255 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
256 RGWSI_SysObj *svc;
257 rgw_raw_obj obj;
258 map<string, bufferlist> attrs;
259
260 protected:
261 int _send_request() override;
262 public:
263 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
264 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
265 map<string, bufferlist> _attrs);
266
267 RGWObjVersionTracker objv_tracker;
268 };
269
270 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
271 RGWRados *store;
272 rgw_raw_obj obj;
273 string lock_name;
274 string cookie;
275 uint32_t duration_secs;
276
277 protected:
278 int _send_request() override;
279 public:
280 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
281 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
282 const string& _name, const string& _cookie, uint32_t _duration_secs);
283 };
284
285 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
286 RGWRados *store;
287 rgw_raw_obj obj;
288 string lock_name;
289 string cookie;
290
291 protected:
292 int _send_request() override;
293 public:
294 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
295 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
296 const string& _name, const string& _cookie);
297 };
298
299 template <class T>
300 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
301 RGWAsyncRadosProcessor *async_rados;
302 RGWSI_SysObj *svc;
303
304 rgw_raw_obj obj;
305 T *result;
306 /// on ENOENT, call handle_data() with an empty object instead of failing
307 const bool empty_on_enoent;
308 RGWObjVersionTracker *objv_tracker;
309 RGWAsyncGetSystemObj *req{nullptr};
310
311 public:
312 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
313 const rgw_raw_obj& _obj,
314 T *_result, bool empty_on_enoent = true,
315 RGWObjVersionTracker *objv_tracker = nullptr)
316 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados), svc(_svc),
317 obj(_obj), result(_result),
318 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
319 ~RGWSimpleRadosReadCR() override {
320 request_cleanup();
321 }
322
323 void request_cleanup() override {
324 if (req) {
325 req->finish();
326 req = NULL;
327 }
328 }
329
330 int send_request() override;
331 int request_complete() override;
332
333 virtual int handle_data(T& data) {
334 return 0;
335 }
336 };
337
338 template <class T>
339 int RGWSimpleRadosReadCR<T>::send_request()
340 {
341 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), svc,
342 objv_tracker, obj, false, false);
343 async_rados->queue(req);
344 return 0;
345 }
346
347 template <class T>
348 int RGWSimpleRadosReadCR<T>::request_complete()
349 {
350 int ret = req->get_ret_status();
351 retcode = ret;
352 if (ret == -ENOENT && empty_on_enoent) {
353 *result = T();
354 } else {
355 if (ret < 0) {
356 return ret;
357 }
358 try {
359 auto iter = req->bl.cbegin();
360 if (iter.end()) {
361 // allow successful reads with empty buffers. ReadSyncStatus coroutines
362 // depend on this to be able to read without locking, because the
363 // cls lock from InitSyncStatus will create an empty object if it didn't
364 // exist
365 *result = T();
366 } else {
367 decode(*result, iter);
368 }
369 } catch (buffer::error& err) {
370 return -EIO;
371 }
372 }
373
374 return handle_data(*result);
375 }
376
377 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
378 RGWAsyncRadosProcessor *async_rados;
379 RGWSI_SysObj *svc;
380
381 rgw_raw_obj obj;
382 map<string, bufferlist> *pattrs;
383 bool raw_attrs;
384 RGWAsyncGetSystemObj *req;
385
386 public:
387 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
388 const rgw_raw_obj& _obj,
389 map<string, bufferlist> *_pattrs, bool _raw_attrs) : RGWSimpleCoroutine(_svc->ctx()),
390 async_rados(_async_rados), svc(_svc),
391 obj(_obj),
392 pattrs(_pattrs),
393 raw_attrs(_raw_attrs),
394 req(NULL) {}
395 ~RGWSimpleRadosReadAttrsCR() override {
396 request_cleanup();
397 }
398
399 void request_cleanup() override {
400 if (req) {
401 req->finish();
402 req = NULL;
403 }
404 }
405
406 int send_request() override;
407 int request_complete() override;
408 };
409
410 template <class T>
411 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
412 RGWAsyncRadosProcessor *async_rados;
413 RGWSI_SysObj *svc;
414 bufferlist bl;
415 rgw_raw_obj obj;
416 RGWObjVersionTracker *objv_tracker;
417 RGWAsyncPutSystemObj *req{nullptr};
418
419 public:
420 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
421 const rgw_raw_obj& _obj,
422 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
423 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
424 svc(_svc), obj(_obj), objv_tracker(objv_tracker) {
425 encode(_data, bl);
426 }
427
428 ~RGWSimpleRadosWriteCR() override {
429 request_cleanup();
430 }
431
432 void request_cleanup() override {
433 if (req) {
434 req->finish();
435 req = NULL;
436 }
437 }
438
439 int send_request() override {
440 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
441 svc, objv_tracker, obj, false, std::move(bl));
442 async_rados->queue(req);
443 return 0;
444 }
445
446 int request_complete() override {
447 if (objv_tracker) { // copy the updated version
448 *objv_tracker = req->objv_tracker;
449 }
450 return req->get_ret_status();
451 }
452 };
453
454 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
455 RGWAsyncRadosProcessor *async_rados;
456 RGWSI_SysObj *svc;
457 RGWObjVersionTracker *objv_tracker;
458
459 rgw_raw_obj obj;
460 map<string, bufferlist> attrs;
461 RGWAsyncPutSystemObjAttrs *req = nullptr;
462
463 public:
464 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados,
465 RGWSI_SysObj *_svc, const rgw_raw_obj& _obj,
466 map<string, bufferlist> _attrs,
467 RGWObjVersionTracker *objv_tracker = nullptr)
468 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
469 svc(_svc), objv_tracker(objv_tracker), obj(_obj),
470 attrs(std::move(_attrs)) {
471 }
472 ~RGWSimpleRadosWriteAttrsCR() override {
473 request_cleanup();
474 }
475
476 void request_cleanup() override {
477 if (req) {
478 req->finish();
479 req = NULL;
480 }
481 }
482
483 int send_request() override {
484 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
485 svc, objv_tracker, obj, std::move(attrs));
486 async_rados->queue(req);
487 return 0;
488 }
489
490 int request_complete() override {
491 if (objv_tracker) { // copy the updated version
492 *objv_tracker = req->objv_tracker;
493 }
494 return req->get_ret_status();
495 }
496 };
497
498 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
499 RGWRados *store;
500 map<string, bufferlist> entries;
501
502 rgw_rados_ref ref;
503
504 rgw_raw_obj obj;
505
506 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
507
508 public:
509 RGWRadosSetOmapKeysCR(RGWRados *_store,
510 const rgw_raw_obj& _obj,
511 map<string, bufferlist>& _entries);
512
513 int send_request() override;
514 int request_complete() override;
515 };
516
517 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
518 public:
519 struct Result {
520 rgw_rados_ref ref;
521 std::set<std::string> entries;
522 bool more = false;
523 };
524 using ResultPtr = std::shared_ptr<Result>;
525
526 RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj,
527 const string& _marker, int _max_entries,
528 ResultPtr result);
529
530 int send_request() override;
531 int request_complete() override;
532
533 private:
534 RGWRados *store;
535 rgw_raw_obj obj;
536 string marker;
537 int max_entries;
538 ResultPtr result;
539 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
540 };
541
542 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
543 RGWRados *store;
544
545 rgw_rados_ref ref;
546
547 set<string> keys;
548
549 rgw_raw_obj obj;
550
551 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
552
553 public:
554 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
555 const rgw_raw_obj& _obj,
556 const set<string>& _keys);
557
558 int send_request() override;
559
560 int request_complete() override;
561 };
562
563 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
564 RGWRados *store;
565 librados::IoCtx ioctx;
566 const rgw_raw_obj obj;
567 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
568
569 public:
570 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
571
572 int send_request() override;
573 int request_complete() override;
574 };
575
576 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
577 RGWAsyncRadosProcessor *async_rados;
578 RGWRados *store;
579 string lock_name;
580 string cookie;
581 uint32_t duration;
582
583 rgw_raw_obj obj;
584
585 RGWAsyncLockSystemObj *req;
586
587 public:
588 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
589 const rgw_raw_obj& _obj,
590 const string& _lock_name,
591 const string& _cookie,
592 uint32_t _duration);
593 ~RGWSimpleRadosLockCR() override {
594 request_cleanup();
595 }
596 void request_cleanup() override;
597
598 int send_request() override;
599 int request_complete() override;
600
601 static std::string gen_random_cookie(CephContext* cct) {
602 #define COOKIE_LEN 16
603 char buf[COOKIE_LEN + 1];
604 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
605 return buf;
606 }
607 };
608
609 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
610 RGWAsyncRadosProcessor *async_rados;
611 RGWRados *store;
612 string lock_name;
613 string cookie;
614
615 rgw_raw_obj obj;
616
617 RGWAsyncUnlockSystemObj *req;
618
619 public:
620 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
621 const rgw_raw_obj& _obj,
622 const string& _lock_name,
623 const string& _cookie);
624 ~RGWSimpleRadosUnlockCR() override {
625 request_cleanup();
626 }
627 void request_cleanup() override;
628
629 int send_request() override;
630 int request_complete() override;
631 };
632
633 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
634
635 class RGWOmapAppend : public RGWConsumerCR<string> {
636 RGWAsyncRadosProcessor *async_rados;
637 RGWRados *store;
638
639 rgw_raw_obj obj;
640
641 bool going_down;
642
643 int num_pending_entries;
644 list<string> pending_entries;
645
646 map<string, bufferlist> entries;
647
648 uint64_t window_size;
649 uint64_t total_entries;
650 public:
651 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
652 const rgw_raw_obj& _obj,
653 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
654 int operate() override;
655 void flush_pending();
656 bool append(const string& s);
657 bool finish();
658
659 uint64_t get_total_entries() {
660 return total_entries;
661 }
662
663 const rgw_raw_obj& get_obj() {
664 return obj;
665 }
666 };
667
668 class RGWAsyncWait : public RGWAsyncRadosRequest {
669 CephContext *cct;
670 Mutex *lock;
671 Cond *cond;
672 utime_t interval;
673 protected:
674 int _send_request() override {
675 Mutex::Locker l(*lock);
676 return cond->WaitInterval(*lock, interval);
677 }
678 public:
679 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
680 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
681 cct(_cct),
682 lock(_lock), cond(_cond), interval(_secs, 0) {}
683
684 void wakeup() {
685 Mutex::Locker l(*lock);
686 cond->Signal();
687 }
688 };
689
690 class RGWWaitCR : public RGWSimpleCoroutine {
691 CephContext *cct;
692 RGWAsyncRadosProcessor *async_rados;
693 Mutex *lock;
694 Cond *cond;
695 int secs;
696
697 RGWAsyncWait *req;
698
699 public:
700 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
701 Mutex *_lock, Cond *_cond,
702 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
703 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
704 }
705 ~RGWWaitCR() override {
706 request_cleanup();
707 }
708
709 void request_cleanup() override {
710 if (req) {
711 wakeup();
712 req->finish();
713 req = NULL;
714 }
715 }
716
717 int send_request() override {
718 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
719 async_rados->queue(req);
720 return 0;
721 }
722
723 int request_complete() override {
724 return req->get_ret_status();
725 }
726
727 void wakeup() {
728 req->wakeup();
729 }
730 };
731
732 class RGWShardedOmapCRManager {
733 RGWAsyncRadosProcessor *async_rados;
734 RGWRados *store;
735 RGWCoroutine *op;
736
737 int num_shards;
738
739 vector<RGWOmapAppend *> shards;
740 public:
741 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
742 : async_rados(_async_rados),
743 store(_store), op(_op), num_shards(_num_shards) {
744 shards.reserve(num_shards);
745 for (int i = 0; i < num_shards; ++i) {
746 char buf[oid_prefix.size() + 16];
747 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
748 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
749 shard->get();
750 shards.push_back(shard);
751 op->spawn(shard, false);
752 }
753 }
754
755 ~RGWShardedOmapCRManager() {
756 for (auto shard : shards) {
757 shard->put();
758 }
759 }
760
761 bool append(const string& entry, int shard_id) {
762 return shards[shard_id]->append(entry);
763 }
764 bool finish() {
765 bool success = true;
766 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
767 success &= ((*iter)->finish() && (!(*iter)->is_error()));
768 }
769 return success;
770 }
771
772 uint64_t get_total_entries(int shard_id) {
773 return shards[shard_id]->get_total_entries();
774 }
775 };
776
777 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
778 RGWRados *store;
779 const std::string oid;
780
781 protected:
782 int _send_request() override;
783 public:
784 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
785 RGWRados *_store, const std::string& oid)
786 : RGWAsyncRadosRequest(caller, cn), store(_store), oid(oid) {}
787
788 RGWBucketInfo bucket_info;
789 };
790
791 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
792 RGWAsyncRadosProcessor *async_rados;
793 RGWRados *store;
794 const std::string oid;
795 RGWBucketInfo *bucket_info;
796
797 RGWAsyncGetBucketInstanceInfo *req{nullptr};
798
799 public:
800 // metadata key constructor
801 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
802 const std::string& meta_key, RGWBucketInfo *_bucket_info)
803 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
804 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
805 bucket_info(_bucket_info) {}
806 // rgw_bucket constructor
807 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
808 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
809 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
810 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
811 bucket_info(_bucket_info) {}
812 ~RGWGetBucketInstanceInfoCR() override {
813 request_cleanup();
814 }
815 void request_cleanup() override {
816 if (req) {
817 req->finish();
818 req = NULL;
819 }
820 }
821
822 int send_request() override {
823 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid);
824 async_rados->queue(req);
825 return 0;
826 }
827 int request_complete() override {
828 if (bucket_info) {
829 *bucket_info = std::move(req->bucket_info);
830 }
831 return req->get_ret_status();
832 }
833 };
834
835 class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
836 RGWRados::BucketShard bs;
837 std::string start_marker;
838 std::string end_marker;
839 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
840 public:
841 RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
842 int shard_id, const std::string& start_marker,
843 const std::string& end_marker);
844
845 int send_request() override;
846 int request_complete() override;
847 };
848
849 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
850 RGWRados *store;
851 string source_zone;
852
853 RGWBucketInfo bucket_info;
854 std::optional<rgw_placement_rule> dest_placement_rule;
855
856 rgw_obj_key key;
857 std::optional<rgw_obj_key> dest_key;
858 std::optional<uint64_t> versioned_epoch;
859
860 real_time src_mtime;
861
862 bool copy_if_newer;
863 rgw_zone_set zones_trace;
864
865 protected:
866 int _send_request() override;
867 public:
868 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
869 const string& _source_zone,
870 RGWBucketInfo& _bucket_info,
871 std::optional<rgw_placement_rule> _dest_placement_rule,
872 const rgw_obj_key& _key,
873 const std::optional<rgw_obj_key>& _dest_key,
874 std::optional<uint64_t> _versioned_epoch,
875 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
876 source_zone(_source_zone),
877 bucket_info(_bucket_info),
878 dest_placement_rule(_dest_placement_rule),
879 key(_key),
880 dest_key(_dest_key),
881 versioned_epoch(_versioned_epoch),
882 copy_if_newer(_if_newer)
883 {
884 if (_zones_trace) {
885 zones_trace = *_zones_trace;
886 }
887 }
888 };
889
890 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
891 CephContext *cct;
892 RGWAsyncRadosProcessor *async_rados;
893 RGWRados *store;
894 string source_zone;
895
896 RGWBucketInfo bucket_info;
897 std::optional<rgw_placement_rule> dest_placement_rule;
898
899 rgw_obj_key key;
900 std::optional<rgw_obj_key> dest_key;
901 std::optional<uint64_t> versioned_epoch;
902
903 real_time src_mtime;
904
905 bool copy_if_newer;
906
907 RGWAsyncFetchRemoteObj *req;
908 rgw_zone_set *zones_trace;
909
910 public:
911 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
912 const string& _source_zone,
913 RGWBucketInfo& _bucket_info,
914 std::optional<rgw_placement_rule> _dest_placement_rule,
915 const rgw_obj_key& _key,
916 const std::optional<rgw_obj_key>& _dest_key,
917 std::optional<uint64_t> _versioned_epoch,
918 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
919 async_rados(_async_rados), store(_store),
920 source_zone(_source_zone),
921 bucket_info(_bucket_info),
922 dest_placement_rule(_dest_placement_rule),
923 key(_key),
924 dest_key(_dest_key),
925 versioned_epoch(_versioned_epoch),
926 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
927
928
929 ~RGWFetchRemoteObjCR() override {
930 request_cleanup();
931 }
932
933 void request_cleanup() override {
934 if (req) {
935 req->finish();
936 req = NULL;
937 }
938 }
939
940 int send_request() override {
941 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
942 source_zone, bucket_info, dest_placement_rule,
943 key, dest_key, versioned_epoch, copy_if_newer, zones_trace);
944 async_rados->queue(req);
945 return 0;
946 }
947
948 int request_complete() override {
949 return req->get_ret_status();
950 }
951 };
952
953 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
954 RGWRados *store;
955 string source_zone;
956
957 RGWBucketInfo bucket_info;
958
959 rgw_obj_key key;
960
961 ceph::real_time *pmtime;
962 uint64_t *psize;
963 string *petag;
964 map<string, bufferlist> *pattrs;
965 map<string, string> *pheaders;
966
967 protected:
968 int _send_request() override;
969 public:
970 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
971 const string& _source_zone,
972 RGWBucketInfo& _bucket_info,
973 const rgw_obj_key& _key,
974 ceph::real_time *_pmtime,
975 uint64_t *_psize,
976 string *_petag,
977 map<string, bufferlist> *_pattrs,
978 map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
979 source_zone(_source_zone),
980 bucket_info(_bucket_info),
981 key(_key),
982 pmtime(_pmtime),
983 psize(_psize),
984 petag(_petag),
985 pattrs(_pattrs),
986 pheaders(_pheaders) {}
987 };
988
989 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
990 CephContext *cct;
991 RGWAsyncRadosProcessor *async_rados;
992 RGWRados *store;
993 string source_zone;
994
995 RGWBucketInfo bucket_info;
996
997 rgw_obj_key key;
998
999 ceph::real_time *pmtime;
1000 uint64_t *psize;
1001 string *petag;
1002 map<string, bufferlist> *pattrs;
1003 map<string, string> *pheaders;
1004
1005 RGWAsyncStatRemoteObj *req;
1006
1007 public:
1008 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1009 const string& _source_zone,
1010 RGWBucketInfo& _bucket_info,
1011 const rgw_obj_key& _key,
1012 ceph::real_time *_pmtime,
1013 uint64_t *_psize,
1014 string *_petag,
1015 map<string, bufferlist> *_pattrs,
1016 map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1017 async_rados(_async_rados), store(_store),
1018 source_zone(_source_zone),
1019 bucket_info(_bucket_info),
1020 key(_key),
1021 pmtime(_pmtime),
1022 psize(_psize),
1023 petag(_petag),
1024 pattrs(_pattrs),
1025 pheaders(_pheaders),
1026 req(NULL) {}
1027
1028
1029 ~RGWStatRemoteObjCR() override {
1030 request_cleanup();
1031 }
1032
1033 void request_cleanup() override {
1034 if (req) {
1035 req->finish();
1036 req = NULL;
1037 }
1038 }
1039
1040 int send_request() override {
1041 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
1042 bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
1043 async_rados->queue(req);
1044 return 0;
1045 }
1046
1047 int request_complete() override {
1048 return req->get_ret_status();
1049 }
1050 };
1051
1052 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
1053 RGWRados *store;
1054 string source_zone;
1055
1056 RGWBucketInfo bucket_info;
1057
1058 rgw_obj_key key;
1059 string owner;
1060 string owner_display_name;
1061 bool versioned;
1062 uint64_t versioned_epoch;
1063 string marker_version_id;
1064
1065 bool del_if_older;
1066 ceph::real_time timestamp;
1067 rgw_zone_set zones_trace;
1068
1069 protected:
1070 int _send_request() override;
1071 public:
1072 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1073 const string& _source_zone,
1074 RGWBucketInfo& _bucket_info,
1075 const rgw_obj_key& _key,
1076 const string& _owner,
1077 const string& _owner_display_name,
1078 bool _versioned,
1079 uint64_t _versioned_epoch,
1080 bool _delete_marker,
1081 bool _if_older,
1082 real_time& _timestamp,
1083 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
1084 source_zone(_source_zone),
1085 bucket_info(_bucket_info),
1086 key(_key),
1087 owner(_owner),
1088 owner_display_name(_owner_display_name),
1089 versioned(_versioned),
1090 versioned_epoch(_versioned_epoch),
1091 del_if_older(_if_older),
1092 timestamp(_timestamp) {
1093 if (_delete_marker) {
1094 marker_version_id = key.instance;
1095 }
1096
1097 if (_zones_trace) {
1098 zones_trace = *_zones_trace;
1099 }
1100 }
1101 };
1102
1103 class RGWRemoveObjCR : public RGWSimpleCoroutine {
1104 CephContext *cct;
1105 RGWAsyncRadosProcessor *async_rados;
1106 RGWRados *store;
1107 string source_zone;
1108
1109 RGWBucketInfo bucket_info;
1110
1111 rgw_obj_key key;
1112 bool versioned;
1113 uint64_t versioned_epoch;
1114 bool delete_marker;
1115 string owner;
1116 string owner_display_name;
1117
1118 bool del_if_older;
1119 real_time timestamp;
1120
1121 RGWAsyncRemoveObj *req;
1122
1123 rgw_zone_set *zones_trace;
1124
1125 public:
1126 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1127 const string& _source_zone,
1128 RGWBucketInfo& _bucket_info,
1129 const rgw_obj_key& _key,
1130 bool _versioned,
1131 uint64_t _versioned_epoch,
1132 string *_owner,
1133 string *_owner_display_name,
1134 bool _delete_marker,
1135 real_time *_timestamp,
1136 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1137 async_rados(_async_rados), store(_store),
1138 source_zone(_source_zone),
1139 bucket_info(_bucket_info),
1140 key(_key),
1141 versioned(_versioned),
1142 versioned_epoch(_versioned_epoch),
1143 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
1144 del_if_older = (_timestamp != NULL);
1145 if (_timestamp) {
1146 timestamp = *_timestamp;
1147 }
1148
1149 if (_owner) {
1150 owner = *_owner;
1151 }
1152
1153 if (_owner_display_name) {
1154 owner_display_name = *_owner_display_name;
1155 }
1156 }
1157 ~RGWRemoveObjCR() override {
1158 request_cleanup();
1159 }
1160
1161 void request_cleanup() override {
1162 if (req) {
1163 req->finish();
1164 req = NULL;
1165 }
1166 }
1167
1168 int send_request() override {
1169 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1170 key, owner, owner_display_name, versioned, versioned_epoch,
1171 delete_marker, del_if_older, timestamp, zones_trace);
1172 async_rados->queue(req);
1173 return 0;
1174 }
1175
1176 int request_complete() override {
1177 return req->get_ret_status();
1178 }
1179 };
1180
1181 class RGWContinuousLeaseCR : public RGWCoroutine {
1182 RGWAsyncRadosProcessor *async_rados;
1183 RGWRados *store;
1184
1185 const rgw_raw_obj obj;
1186
1187 const string lock_name;
1188 const string cookie;
1189
1190 int interval;
1191
1192 Mutex lock;
1193 std::atomic<bool> going_down = { false };
1194 bool locked{false};
1195
1196 RGWCoroutine *caller;
1197
1198 bool aborted{false};
1199
1200 public:
1201 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1202 const rgw_raw_obj& _obj,
1203 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1204 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1205 obj(_obj), lock_name(_lock_name),
1206 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1207 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1208 {}
1209
1210 int operate() override;
1211
1212 bool is_locked() {
1213 Mutex::Locker l(lock);
1214 return locked;
1215 }
1216
1217 void set_locked(bool status) {
1218 Mutex::Locker l(lock);
1219 locked = status;
1220 }
1221
1222 void go_down() {
1223 going_down = true;
1224 wakeup();
1225 }
1226
1227 void abort() {
1228 aborted = true;
1229 }
1230 };
1231
1232 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1233 RGWRados *store;
1234 list<cls_log_entry> entries;
1235
1236 string oid;
1237
1238 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1239
1240 public:
1241 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1242 const cls_log_entry& entry);
1243
1244 int send_request() override;
1245 int request_complete() override;
1246 };
1247
1248 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1249 RGWRados *store;
1250 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1251 protected:
1252 std::string oid;
1253 real_time start_time;
1254 real_time end_time;
1255 std::string from_marker;
1256 std::string to_marker;
1257
1258 public:
1259 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1260 const real_time& start_time, const real_time& end_time,
1261 const std::string& from_marker,
1262 const std::string& to_marker);
1263
1264 int send_request() override;
1265 int request_complete() override;
1266 };
1267
1268 // wrapper to update last_trim_marker on success
1269 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1270 CephContext *cct;
1271 std::string *last_trim_marker;
1272 public:
1273 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1274 const std::string& to_marker, std::string *last_trim_marker);
1275 int request_complete() override;
1276 };
1277
1278 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1279 RGWRados *store;
1280 RGWBucketInfo bucket_info;
1281 rgw_obj obj;
1282 uint64_t *psize;
1283 real_time *pmtime;
1284 uint64_t *pepoch;
1285 RGWObjVersionTracker *objv_tracker;
1286 protected:
1287 int _send_request() override;
1288 public:
1289 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1290 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1291 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1292 RGWObjVersionTracker *objv_tracker = nullptr)
1293 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1294 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1295 };
1296
1297 class RGWStatObjCR : public RGWSimpleCoroutine {
1298 RGWRados *store;
1299 RGWAsyncRadosProcessor *async_rados;
1300 RGWBucketInfo bucket_info;
1301 rgw_obj obj;
1302 uint64_t *psize;
1303 real_time *pmtime;
1304 uint64_t *pepoch;
1305 RGWObjVersionTracker *objv_tracker;
1306 RGWAsyncStatObj *req = nullptr;
1307 public:
1308 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1309 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1310 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1311 RGWObjVersionTracker *objv_tracker = nullptr);
1312 ~RGWStatObjCR() override {
1313 request_cleanup();
1314 }
1315 void request_cleanup() override;
1316
1317 int send_request() override;
1318 int request_complete() override;
1319 };
1320
1321 /// coroutine wrapper for IoCtx::aio_notify()
1322 class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1323 RGWRados *const store;
1324 const rgw_raw_obj obj;
1325 bufferlist request;
1326 const uint64_t timeout_ms;
1327 bufferlist *response;
1328 rgw_rados_ref ref;
1329 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1330
1331 public:
1332 RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
1333 bufferlist& request, uint64_t timeout_ms,
1334 bufferlist *response);
1335
1336 int send_request() override;
1337 int request_complete() override;
1338 };
1339
1340 #endif