]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
ac267b618a7f38c7633065ad753ecd7fc397200f
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
6
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
10 #include "rgw_sal.h"
11 #include "rgw_sal_rados.h"
12 #include "common/WorkQueue.h"
13 #include "common/Throttle.h"
14
15 #include <atomic>
16
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_bucket.h"
19
20 class RGWAsyncRadosRequest : public RefCountedObject {
21 RGWCoroutine *caller;
22 RGWAioCompletionNotifier *notifier;
23
24 int retcode;
25
26 ceph::mutex lock = ceph::make_mutex("RGWAsyncRadosRequest::lock");
27
28 protected:
29 virtual int _send_request() = 0;
30 public:
31 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn)
32 : caller(_caller), notifier(_cn), retcode(0) {
33 }
34 ~RGWAsyncRadosRequest() override {
35 if (notifier) {
36 notifier->put();
37 }
38 }
39
40 void send_request() {
41 get();
42 retcode = _send_request();
43 {
44 std::lock_guard l{lock};
45 if (notifier) {
46 notifier->cb(); // drops its own ref
47 notifier = nullptr;
48 }
49 }
50 put();
51 }
52
53 int get_ret_status() { return retcode; }
54
55 void finish() {
56 {
57 std::lock_guard l{lock};
58 if (notifier) {
59 // we won't call notifier->cb() to drop its ref, so drop it here
60 notifier->put();
61 notifier = nullptr;
62 }
63 }
64 put();
65 }
66 };
67
68
69 class RGWAsyncRadosProcessor {
70 deque<RGWAsyncRadosRequest *> m_req_queue;
71 std::atomic<bool> going_down = { false };
72 protected:
73 CephContext *cct;
74 ThreadPool m_tp;
75 Throttle req_throttle;
76
77 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
78 RGWAsyncRadosProcessor *processor;
79 RGWWQ(RGWAsyncRadosProcessor *p,
80 ceph::timespan timeout, ceph::timespan suicide_timeout,
81 ThreadPool *tp)
82 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
83
84 bool _enqueue(RGWAsyncRadosRequest *req) override;
85 void _dequeue(RGWAsyncRadosRequest *req) override {
86 ceph_abort();
87 }
88 bool _empty() override;
89 RGWAsyncRadosRequest *_dequeue() override;
90 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
91 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
92 void _dump_queue();
93 void _clear() override {
94 ceph_assert(processor->m_req_queue.empty());
95 }
96 } req_wq;
97
98 public:
99 RGWAsyncRadosProcessor(CephContext *_cct, int num_threads);
100 ~RGWAsyncRadosProcessor() {}
101 void start();
102 void stop();
103 void handle_request(RGWAsyncRadosRequest *req);
104 void queue(RGWAsyncRadosRequest *req);
105
106 bool is_going_down() {
107 return going_down;
108 }
109 };
110
111 template <class P>
112 class RGWSimpleWriteOnlyAsyncCR : public RGWSimpleCoroutine {
113 RGWAsyncRadosProcessor *async_rados;
114 rgw::sal::RGWRadosStore *store;
115
116 P params;
117 const DoutPrefixProvider *dpp;
118
119 class Request : public RGWAsyncRadosRequest {
120 rgw::sal::RGWRadosStore *store;
121 P params;
122 const DoutPrefixProvider *dpp;
123 protected:
124 int _send_request() override;
125 public:
126 Request(RGWCoroutine *caller,
127 RGWAioCompletionNotifier *cn,
128 rgw::sal::RGWRadosStore *store,
129 const P& _params,
130 const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn),
131 store(store),
132 params(_params),
133 dpp(dpp) {}
134 } *req{nullptr};
135
136 public:
137 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor *_async_rados,
138 rgw::sal::RGWRadosStore *_store,
139 const P& _params,
140 const DoutPrefixProvider *_dpp) : RGWSimpleCoroutine(_store->ctx()),
141 async_rados(_async_rados),
142 store(_store),
143 params(_params),
144 dpp(_dpp) {}
145
146 ~RGWSimpleWriteOnlyAsyncCR() override {
147 request_cleanup();
148 }
149 void request_cleanup() override {
150 if (req) {
151 req->finish();
152 req = NULL;
153 }
154 }
155
156 int send_request() override {
157 req = new Request(this,
158 stack->create_completion_notifier(),
159 store,
160 params,
161 dpp);
162
163 async_rados->queue(req);
164 return 0;
165 }
166 int request_complete() override {
167 return req->get_ret_status();
168 }
169 };
170
171
172 template <class P, class R>
173 class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
174 RGWAsyncRadosProcessor *async_rados;
175 rgw::sal::RGWRadosStore *store;
176
177 P params;
178 std::shared_ptr<R> result;
179
180 class Request : public RGWAsyncRadosRequest {
181 rgw::sal::RGWRadosStore *store;
182 P params;
183 std::shared_ptr<R> result;
184 protected:
185 int _send_request() override;
186 public:
187 Request(RGWCoroutine *caller,
188 RGWAioCompletionNotifier *cn,
189 rgw::sal::RGWRadosStore *_store,
190 const P& _params,
191 std::shared_ptr<R>& _result) : RGWAsyncRadosRequest(caller, cn),
192 store(_store),
193 params(_params),
194 result(_result) {}
195 } *req{nullptr};
196
197 public:
198 RGWSimpleAsyncCR(RGWAsyncRadosProcessor *_async_rados,
199 rgw::sal::RGWRadosStore *_store,
200 const P& _params,
201 std::shared_ptr<R>& _result) : RGWSimpleCoroutine(_store->ctx()),
202 async_rados(_async_rados),
203 store(_store),
204 params(_params),
205 result(_result) {}
206
207 ~RGWSimpleAsyncCR() override {
208 request_cleanup();
209 }
210 void request_cleanup() override {
211 if (req) {
212 req->finish();
213 req = NULL;
214 }
215 }
216
217 int send_request() override {
218 req = new Request(this,
219 stack->create_completion_notifier(),
220 store,
221 params,
222 result);
223
224 async_rados->queue(req);
225 return 0;
226 }
227 int request_complete() override {
228 return req->get_ret_status();
229 }
230 };
231
232 class RGWGenericAsyncCR : public RGWSimpleCoroutine {
233 RGWAsyncRadosProcessor *async_rados;
234 rgw::sal::RGWRadosStore *store;
235
236
237 public:
238 class Action {
239 public:
240 virtual ~Action() {}
241 virtual int operate() = 0;
242 };
243
244 private:
245 std::shared_ptr<Action> action;
246
247 class Request : public RGWAsyncRadosRequest {
248 std::shared_ptr<Action> action;
249 protected:
250 int _send_request() override {
251 if (!action) {
252 return 0;
253 }
254 return action->operate();
255 }
256 public:
257 Request(RGWCoroutine *caller,
258 RGWAioCompletionNotifier *cn,
259 std::shared_ptr<Action>& _action) : RGWAsyncRadosRequest(caller, cn),
260 action(_action) {}
261 } *req{nullptr};
262
263 public:
264 RGWGenericAsyncCR(CephContext *_cct,
265 RGWAsyncRadosProcessor *_async_rados,
266 std::shared_ptr<Action>& _action) : RGWSimpleCoroutine(_cct),
267 async_rados(_async_rados),
268 action(_action) {}
269 template<typename T>
270 RGWGenericAsyncCR(CephContext *_cct,
271 RGWAsyncRadosProcessor *_async_rados,
272 std::shared_ptr<T>& _action) : RGWSimpleCoroutine(_cct),
273 async_rados(_async_rados),
274 action(std::static_pointer_cast<Action>(_action)) {}
275
276 ~RGWGenericAsyncCR() override {
277 request_cleanup();
278 }
279 void request_cleanup() override {
280 if (req) {
281 req->finish();
282 req = NULL;
283 }
284 }
285
286 int send_request() override {
287 req = new Request(this,
288 stack->create_completion_notifier(),
289 action);
290
291 async_rados->queue(req);
292 return 0;
293 }
294 int request_complete() override {
295 return req->get_ret_status();
296 }
297 };
298
299
300 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
301 RGWSysObjectCtx obj_ctx;
302 rgw_raw_obj obj;
303 const bool want_attrs;
304 const bool raw_attrs;
305 protected:
306 int _send_request() override;
307 public:
308 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
309 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
310 bool want_attrs, bool raw_attrs);
311
312 bufferlist bl;
313 map<string, bufferlist> attrs;
314 RGWObjVersionTracker objv_tracker;
315 };
316
317 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
318 RGWSI_SysObj *svc;
319 rgw_raw_obj obj;
320 bool exclusive;
321 bufferlist bl;
322
323 protected:
324 int _send_request() override;
325 public:
326 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
327 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
328 bool _exclusive, bufferlist _bl);
329
330 RGWObjVersionTracker objv_tracker;
331 };
332
333 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
334 RGWSI_SysObj *svc;
335 rgw_raw_obj obj;
336 map<string, bufferlist> attrs;
337
338 protected:
339 int _send_request() override;
340 public:
341 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
342 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
343 map<string, bufferlist> _attrs);
344
345 RGWObjVersionTracker objv_tracker;
346 };
347
348 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
349 rgw::sal::RGWRadosStore *store;
350 rgw_raw_obj obj;
351 string lock_name;
352 string cookie;
353 uint32_t duration_secs;
354
355 protected:
356 int _send_request() override;
357 public:
358 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
359 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
360 const string& _name, const string& _cookie, uint32_t _duration_secs);
361 };
362
363 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
364 rgw::sal::RGWRadosStore *store;
365 rgw_raw_obj obj;
366 string lock_name;
367 string cookie;
368
369 protected:
370 int _send_request() override;
371 public:
372 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
373 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
374 const string& _name, const string& _cookie);
375 };
376
377 template <class T>
378 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
379 RGWAsyncRadosProcessor *async_rados;
380 RGWSI_SysObj *svc;
381
382 rgw_raw_obj obj;
383 T *result;
384 /// on ENOENT, call handle_data() with an empty object instead of failing
385 const bool empty_on_enoent;
386 RGWObjVersionTracker *objv_tracker;
387 RGWAsyncGetSystemObj *req{nullptr};
388
389 public:
390 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
391 const rgw_raw_obj& _obj,
392 T *_result, bool empty_on_enoent = true,
393 RGWObjVersionTracker *objv_tracker = nullptr)
394 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados), svc(_svc),
395 obj(_obj), result(_result),
396 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
397 ~RGWSimpleRadosReadCR() override {
398 request_cleanup();
399 }
400
401 void request_cleanup() override {
402 if (req) {
403 req->finish();
404 req = NULL;
405 }
406 }
407
408 int send_request() override;
409 int request_complete() override;
410
411 virtual int handle_data(T& data) {
412 return 0;
413 }
414 };
415
416 template <class T>
417 int RGWSimpleRadosReadCR<T>::send_request()
418 {
419 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), svc,
420 objv_tracker, obj, false, false);
421 async_rados->queue(req);
422 return 0;
423 }
424
425 template <class T>
426 int RGWSimpleRadosReadCR<T>::request_complete()
427 {
428 int ret = req->get_ret_status();
429 retcode = ret;
430 if (ret == -ENOENT && empty_on_enoent) {
431 *result = T();
432 } else {
433 if (ret < 0) {
434 return ret;
435 }
436 try {
437 auto iter = req->bl.cbegin();
438 if (iter.end()) {
439 // allow successful reads with empty buffers. ReadSyncStatus coroutines
440 // depend on this to be able to read without locking, because the
441 // cls lock from InitSyncStatus will create an empty object if it didn't
442 // exist
443 *result = T();
444 } else {
445 decode(*result, iter);
446 }
447 } catch (buffer::error& err) {
448 return -EIO;
449 }
450 }
451
452 return handle_data(*result);
453 }
454
455 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
456 RGWAsyncRadosProcessor *async_rados;
457 RGWSI_SysObj *svc;
458
459 rgw_raw_obj obj;
460 map<string, bufferlist> *pattrs;
461 bool raw_attrs;
462 RGWObjVersionTracker* objv_tracker;
463 RGWAsyncGetSystemObj *req = nullptr;
464
465 public:
466 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
467 const rgw_raw_obj& _obj, map<string, bufferlist> *_pattrs,
468 bool _raw_attrs, RGWObjVersionTracker* objv_tracker = nullptr)
469 : RGWSimpleCoroutine(_svc->ctx()),
470 async_rados(_async_rados), svc(_svc),
471 obj(_obj),
472 pattrs(_pattrs),
473 raw_attrs(_raw_attrs),
474 objv_tracker(objv_tracker)
475 {}
476 ~RGWSimpleRadosReadAttrsCR() override {
477 request_cleanup();
478 }
479
480 void request_cleanup() override {
481 if (req) {
482 req->finish();
483 req = NULL;
484 }
485 }
486
487 int send_request() override;
488 int request_complete() override;
489 };
490
491 template <class T>
492 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
493 RGWAsyncRadosProcessor *async_rados;
494 RGWSI_SysObj *svc;
495 bufferlist bl;
496 rgw_raw_obj obj;
497 RGWObjVersionTracker *objv_tracker;
498 RGWAsyncPutSystemObj *req{nullptr};
499
500 public:
501 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
502 const rgw_raw_obj& _obj,
503 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
504 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
505 svc(_svc), obj(_obj), objv_tracker(objv_tracker) {
506 encode(_data, bl);
507 }
508
509 ~RGWSimpleRadosWriteCR() override {
510 request_cleanup();
511 }
512
513 void request_cleanup() override {
514 if (req) {
515 req->finish();
516 req = NULL;
517 }
518 }
519
520 int send_request() override {
521 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
522 svc, objv_tracker, obj, false, std::move(bl));
523 async_rados->queue(req);
524 return 0;
525 }
526
527 int request_complete() override {
528 if (objv_tracker) { // copy the updated version
529 *objv_tracker = req->objv_tracker;
530 }
531 return req->get_ret_status();
532 }
533 };
534
535 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
536 RGWAsyncRadosProcessor *async_rados;
537 RGWSI_SysObj *svc;
538 RGWObjVersionTracker *objv_tracker;
539
540 rgw_raw_obj obj;
541 map<string, bufferlist> attrs;
542 RGWAsyncPutSystemObjAttrs *req = nullptr;
543
544 public:
545 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados,
546 RGWSI_SysObj *_svc, const rgw_raw_obj& _obj,
547 map<string, bufferlist> _attrs,
548 RGWObjVersionTracker *objv_tracker = nullptr)
549 : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
550 svc(_svc), objv_tracker(objv_tracker), obj(_obj),
551 attrs(std::move(_attrs)) {
552 }
553 ~RGWSimpleRadosWriteAttrsCR() override {
554 request_cleanup();
555 }
556
557 void request_cleanup() override {
558 if (req) {
559 req->finish();
560 req = NULL;
561 }
562 }
563
564 int send_request() override {
565 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
566 svc, objv_tracker, obj, std::move(attrs));
567 async_rados->queue(req);
568 return 0;
569 }
570
571 int request_complete() override {
572 if (objv_tracker) { // copy the updated version
573 *objv_tracker = req->objv_tracker;
574 }
575 return req->get_ret_status();
576 }
577 };
578
579 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
580 rgw::sal::RGWRadosStore *store;
581 map<string, bufferlist> entries;
582
583 rgw_rados_ref ref;
584
585 rgw_raw_obj obj;
586
587 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
588
589 public:
590 RGWRadosSetOmapKeysCR(rgw::sal::RGWRadosStore *_store,
591 const rgw_raw_obj& _obj,
592 map<string, bufferlist>& _entries);
593
594 int send_request() override;
595 int request_complete() override;
596 };
597
598 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
599 public:
600 struct Result {
601 rgw_rados_ref ref;
602 std::set<std::string> entries;
603 bool more = false;
604 };
605 using ResultPtr = std::shared_ptr<Result>;
606
607 RGWRadosGetOmapKeysCR(rgw::sal::RGWRadosStore *_store, const rgw_raw_obj& _obj,
608 const string& _marker, int _max_entries,
609 ResultPtr result);
610
611 int send_request() override;
612 int request_complete() override;
613
614 private:
615 rgw::sal::RGWRadosStore *store;
616 rgw_raw_obj obj;
617 string marker;
618 int max_entries;
619 ResultPtr result;
620 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
621 };
622
623 class RGWRadosGetOmapValsCR : public RGWSimpleCoroutine {
624 public:
625 struct Result {
626 rgw_rados_ref ref;
627 std::map<std::string, bufferlist> entries;
628 bool more = false;
629 };
630 using ResultPtr = std::shared_ptr<Result>;
631
632 RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore *_store, const rgw_raw_obj& _obj,
633 const string& _marker, int _max_entries,
634 ResultPtr result);
635
636 int send_request() override;
637 int request_complete() override;
638
639 private:
640 rgw::sal::RGWRadosStore *store;
641 rgw_raw_obj obj;
642 string marker;
643 int max_entries;
644 ResultPtr result;
645 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
646 };
647
648 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
649 rgw::sal::RGWRadosStore *store;
650
651 rgw_rados_ref ref;
652
653 set<string> keys;
654
655 rgw_raw_obj obj;
656
657 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
658
659 public:
660 RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore *_store,
661 const rgw_raw_obj& _obj,
662 const set<string>& _keys);
663
664 int send_request() override;
665
666 int request_complete() override;
667 };
668
669 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
670 rgw::sal::RGWRadosStore *store;
671 librados::IoCtx ioctx;
672 const rgw_raw_obj obj;
673 RGWObjVersionTracker* objv_tracker;
674 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
675
676 public:
677 RGWRadosRemoveCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
678 RGWObjVersionTracker* objv_tracker = nullptr);
679
680 int send_request() override;
681 int request_complete() override;
682 };
683
684 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
685 RGWAsyncRadosProcessor *async_rados;
686 rgw::sal::RGWRadosStore *store;
687 string lock_name;
688 string cookie;
689 uint32_t duration;
690
691 rgw_raw_obj obj;
692
693 RGWAsyncLockSystemObj *req;
694
695 public:
696 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
697 const rgw_raw_obj& _obj,
698 const string& _lock_name,
699 const string& _cookie,
700 uint32_t _duration);
701 ~RGWSimpleRadosLockCR() override {
702 request_cleanup();
703 }
704 void request_cleanup() override;
705
706 int send_request() override;
707 int request_complete() override;
708
709 static std::string gen_random_cookie(CephContext* cct) {
710 #define COOKIE_LEN 16
711 char buf[COOKIE_LEN + 1];
712 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
713 return buf;
714 }
715 };
716
717 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
718 RGWAsyncRadosProcessor *async_rados;
719 rgw::sal::RGWRadosStore *store;
720 string lock_name;
721 string cookie;
722
723 rgw_raw_obj obj;
724
725 RGWAsyncUnlockSystemObj *req;
726
727 public:
728 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
729 const rgw_raw_obj& _obj,
730 const string& _lock_name,
731 const string& _cookie);
732 ~RGWSimpleRadosUnlockCR() override {
733 request_cleanup();
734 }
735 void request_cleanup() override;
736
737 int send_request() override;
738 int request_complete() override;
739 };
740
741 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
742
743 class RGWOmapAppend : public RGWConsumerCR<string> {
744 RGWAsyncRadosProcessor *async_rados;
745 rgw::sal::RGWRadosStore *store;
746
747 rgw_raw_obj obj;
748
749 bool going_down;
750
751 int num_pending_entries;
752 list<string> pending_entries;
753
754 map<string, bufferlist> entries;
755
756 uint64_t window_size;
757 uint64_t total_entries;
758 public:
759 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
760 const rgw_raw_obj& _obj,
761 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
762 int operate() override;
763 void flush_pending();
764 bool append(const string& s);
765 bool finish();
766
767 uint64_t get_total_entries() {
768 return total_entries;
769 }
770
771 const rgw_raw_obj& get_obj() {
772 return obj;
773 }
774 };
775
776 class RGWShardedOmapCRManager {
777 RGWAsyncRadosProcessor *async_rados;
778 rgw::sal::RGWRadosStore *store;
779 RGWCoroutine *op;
780
781 int num_shards;
782
783 vector<RGWOmapAppend *> shards;
784 public:
785 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
786 : async_rados(_async_rados),
787 store(_store), op(_op), num_shards(_num_shards) {
788 shards.reserve(num_shards);
789 for (int i = 0; i < num_shards; ++i) {
790 char buf[oid_prefix.size() + 16];
791 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
792 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
793 shard->get();
794 shards.push_back(shard);
795 op->spawn(shard, false);
796 }
797 }
798
799 ~RGWShardedOmapCRManager() {
800 for (auto shard : shards) {
801 shard->put();
802 }
803 }
804
805 bool append(const string& entry, int shard_id) {
806 return shards[shard_id]->append(entry);
807 }
808 bool finish() {
809 bool success = true;
810 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
811 success &= ((*iter)->finish() && (!(*iter)->is_error()));
812 }
813 return success;
814 }
815
816 uint64_t get_total_entries(int shard_id) {
817 return shards[shard_id]->get_total_entries();
818 }
819 };
820
821 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
822 rgw::sal::RGWRadosStore *store;
823 rgw_bucket bucket;
824
825 protected:
826 int _send_request() override;
827 public:
828 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
829 rgw::sal::RGWRadosStore *_store, const rgw_bucket& bucket)
830 : RGWAsyncRadosRequest(caller, cn), store(_store), bucket(bucket) {}
831
832 RGWBucketInfo bucket_info;
833 map<string, bufferlist> attrs;
834 };
835
836 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
837 RGWAsyncRadosProcessor *async_rados;
838 rgw::sal::RGWRadosStore *store;
839 rgw_bucket bucket;
840 RGWBucketInfo *bucket_info;
841 map<string, bufferlist> *pattrs;
842
843 RGWAsyncGetBucketInstanceInfo *req{nullptr};
844
845 public:
846 // rgw_bucket constructor
847 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
848 const rgw_bucket& _bucket, RGWBucketInfo *_bucket_info,
849 map<string, bufferlist> *_pattrs)
850 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
851 bucket(_bucket), bucket_info(_bucket_info), pattrs(_pattrs) {}
852 ~RGWGetBucketInstanceInfoCR() override {
853 request_cleanup();
854 }
855 void request_cleanup() override {
856 if (req) {
857 req->finish();
858 req = NULL;
859 }
860 }
861
862 int send_request() override {
863 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket);
864 async_rados->queue(req);
865 return 0;
866 }
867 int request_complete() override {
868 if (bucket_info) {
869 *bucket_info = std::move(req->bucket_info);
870 }
871 if (pattrs) {
872 *pattrs = std::move(req->attrs);
873 }
874 return req->get_ret_status();
875 }
876 };
877
878 class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
879 RGWRados::BucketShard bs;
880 std::string start_marker;
881 std::string end_marker;
882 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
883 public:
884 RGWRadosBILogTrimCR(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info,
885 int shard_id, const std::string& start_marker,
886 const std::string& end_marker);
887
888 int send_request() override;
889 int request_complete() override;
890 };
891
892 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
893 rgw::sal::RGWRadosStore *store;
894 rgw_zone_id source_zone;
895
896 std::optional<rgw_user> user_id;
897
898 rgw_bucket src_bucket;
899 std::optional<rgw_placement_rule> dest_placement_rule;
900 RGWBucketInfo dest_bucket_info;
901
902 rgw_obj_key key;
903 std::optional<rgw_obj_key> dest_key;
904 std::optional<uint64_t> versioned_epoch;
905
906 real_time src_mtime;
907
908 bool copy_if_newer;
909 std::shared_ptr<RGWFetchObjFilter> filter;
910 rgw_zone_set zones_trace;
911 PerfCounters* counters;
912 const DoutPrefixProvider *dpp;
913
914 protected:
915 int _send_request() override;
916 public:
917 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
918 const rgw_zone_id& _source_zone,
919 std::optional<rgw_user>& _user_id,
920 const rgw_bucket& _src_bucket,
921 std::optional<rgw_placement_rule> _dest_placement_rule,
922 const RGWBucketInfo& _dest_bucket_info,
923 const rgw_obj_key& _key,
924 const std::optional<rgw_obj_key>& _dest_key,
925 std::optional<uint64_t> _versioned_epoch,
926 bool _if_newer,
927 std::shared_ptr<RGWFetchObjFilter> _filter,
928 rgw_zone_set *_zones_trace,
929 PerfCounters* counters, const DoutPrefixProvider *dpp)
930 : RGWAsyncRadosRequest(caller, cn), store(_store),
931 source_zone(_source_zone),
932 user_id(_user_id),
933 src_bucket(_src_bucket),
934 dest_placement_rule(_dest_placement_rule),
935 dest_bucket_info(_dest_bucket_info),
936 key(_key),
937 dest_key(_dest_key),
938 versioned_epoch(_versioned_epoch),
939 copy_if_newer(_if_newer),
940 filter(_filter),
941 counters(counters),
942 dpp(dpp)
943 {
944 if (_zones_trace) {
945 zones_trace = *_zones_trace;
946 }
947 }
948 };
949
950 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
951 CephContext *cct;
952 RGWAsyncRadosProcessor *async_rados;
953 rgw::sal::RGWRadosStore *store;
954 rgw_zone_id source_zone;
955
956 std::optional<rgw_user> user_id;
957
958 rgw_bucket src_bucket;
959 std::optional<rgw_placement_rule> dest_placement_rule;
960 RGWBucketInfo dest_bucket_info;
961
962 rgw_obj_key key;
963 std::optional<rgw_obj_key> dest_key;
964 std::optional<uint64_t> versioned_epoch;
965
966 real_time src_mtime;
967
968 bool copy_if_newer;
969
970 std::shared_ptr<RGWFetchObjFilter> filter;
971
972 RGWAsyncFetchRemoteObj *req;
973 rgw_zone_set *zones_trace;
974 PerfCounters* counters;
975 const DoutPrefixProvider *dpp;
976
977 public:
978 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
979 const rgw_zone_id& _source_zone,
980 std::optional<rgw_user> _user_id,
981 const rgw_bucket& _src_bucket,
982 std::optional<rgw_placement_rule> _dest_placement_rule,
983 const RGWBucketInfo& _dest_bucket_info,
984 const rgw_obj_key& _key,
985 const std::optional<rgw_obj_key>& _dest_key,
986 std::optional<uint64_t> _versioned_epoch,
987 bool _if_newer,
988 std::shared_ptr<RGWFetchObjFilter> _filter,
989 rgw_zone_set *_zones_trace,
990 PerfCounters* counters, const DoutPrefixProvider *dpp)
991 : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
992 async_rados(_async_rados), store(_store),
993 source_zone(_source_zone),
994 user_id(_user_id),
995 src_bucket(_src_bucket),
996 dest_placement_rule(_dest_placement_rule),
997 dest_bucket_info(_dest_bucket_info),
998 key(_key),
999 dest_key(_dest_key),
1000 versioned_epoch(_versioned_epoch),
1001 copy_if_newer(_if_newer),
1002 filter(_filter),
1003 req(NULL),
1004 zones_trace(_zones_trace), counters(counters), dpp(dpp) {}
1005
1006
1007 ~RGWFetchRemoteObjCR() override {
1008 request_cleanup();
1009 }
1010
1011 void request_cleanup() override {
1012 if (req) {
1013 req->finish();
1014 req = NULL;
1015 }
1016 }
1017
1018 int send_request() override {
1019 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
1020 source_zone, user_id, src_bucket, dest_placement_rule, dest_bucket_info,
1021 key, dest_key, versioned_epoch, copy_if_newer, filter,
1022 zones_trace, counters, dpp);
1023 async_rados->queue(req);
1024 return 0;
1025 }
1026
1027 int request_complete() override {
1028 return req->get_ret_status();
1029 }
1030 };
1031
1032 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
1033 rgw::sal::RGWRadosStore *store;
1034 rgw_zone_id source_zone;
1035
1036 rgw_bucket src_bucket;
1037 rgw_obj_key key;
1038
1039 ceph::real_time *pmtime;
1040 uint64_t *psize;
1041 string *petag;
1042 map<string, bufferlist> *pattrs;
1043 map<string, string> *pheaders;
1044
1045 protected:
1046 int _send_request() override;
1047 public:
1048 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
1049 const rgw_zone_id& _source_zone,
1050 rgw_bucket& _src_bucket,
1051 const rgw_obj_key& _key,
1052 ceph::real_time *_pmtime,
1053 uint64_t *_psize,
1054 string *_petag,
1055 map<string, bufferlist> *_pattrs,
1056 map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
1057 source_zone(_source_zone),
1058 src_bucket(_src_bucket),
1059 key(_key),
1060 pmtime(_pmtime),
1061 psize(_psize),
1062 petag(_petag),
1063 pattrs(_pattrs),
1064 pheaders(_pheaders) {}
1065 };
1066
1067 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
1068 CephContext *cct;
1069 RGWAsyncRadosProcessor *async_rados;
1070 rgw::sal::RGWRadosStore *store;
1071 rgw_zone_id source_zone;
1072
1073 rgw_bucket src_bucket;
1074 rgw_obj_key key;
1075
1076 ceph::real_time *pmtime;
1077 uint64_t *psize;
1078 string *petag;
1079 map<string, bufferlist> *pattrs;
1080 map<string, string> *pheaders;
1081
1082 RGWAsyncStatRemoteObj *req;
1083
1084 public:
1085 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
1086 const rgw_zone_id& _source_zone,
1087 rgw_bucket& _src_bucket,
1088 const rgw_obj_key& _key,
1089 ceph::real_time *_pmtime,
1090 uint64_t *_psize,
1091 string *_petag,
1092 map<string, bufferlist> *_pattrs,
1093 map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1094 async_rados(_async_rados), store(_store),
1095 source_zone(_source_zone),
1096 src_bucket(_src_bucket),
1097 key(_key),
1098 pmtime(_pmtime),
1099 psize(_psize),
1100 petag(_petag),
1101 pattrs(_pattrs),
1102 pheaders(_pheaders),
1103 req(NULL) {}
1104
1105
1106 ~RGWStatRemoteObjCR() override {
1107 request_cleanup();
1108 }
1109
1110 void request_cleanup() override {
1111 if (req) {
1112 req->finish();
1113 req = NULL;
1114 }
1115 }
1116
1117 int send_request() override {
1118 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
1119 src_bucket, key, pmtime, psize, petag, pattrs, pheaders);
1120 async_rados->queue(req);
1121 return 0;
1122 }
1123
1124 int request_complete() override {
1125 return req->get_ret_status();
1126 }
1127 };
1128
1129 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
1130 rgw::sal::RGWRadosStore *store;
1131 rgw_zone_id source_zone;
1132
1133 RGWBucketInfo bucket_info;
1134
1135 rgw_obj_key key;
1136 string owner;
1137 string owner_display_name;
1138 bool versioned;
1139 uint64_t versioned_epoch;
1140 string marker_version_id;
1141
1142 bool del_if_older;
1143 ceph::real_time timestamp;
1144 rgw_zone_set zones_trace;
1145
1146 protected:
1147 int _send_request() override;
1148 public:
1149 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
1150 const rgw_zone_id& _source_zone,
1151 RGWBucketInfo& _bucket_info,
1152 const rgw_obj_key& _key,
1153 const string& _owner,
1154 const string& _owner_display_name,
1155 bool _versioned,
1156 uint64_t _versioned_epoch,
1157 bool _delete_marker,
1158 bool _if_older,
1159 real_time& _timestamp,
1160 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
1161 source_zone(_source_zone),
1162 bucket_info(_bucket_info),
1163 key(_key),
1164 owner(_owner),
1165 owner_display_name(_owner_display_name),
1166 versioned(_versioned),
1167 versioned_epoch(_versioned_epoch),
1168 del_if_older(_if_older),
1169 timestamp(_timestamp) {
1170 if (_delete_marker) {
1171 marker_version_id = key.instance;
1172 }
1173
1174 if (_zones_trace) {
1175 zones_trace = *_zones_trace;
1176 }
1177 }
1178 };
1179
1180 class RGWRemoveObjCR : public RGWSimpleCoroutine {
1181 CephContext *cct;
1182 RGWAsyncRadosProcessor *async_rados;
1183 rgw::sal::RGWRadosStore *store;
1184 rgw_zone_id source_zone;
1185
1186 RGWBucketInfo bucket_info;
1187
1188 rgw_obj_key key;
1189 bool versioned;
1190 uint64_t versioned_epoch;
1191 bool delete_marker;
1192 string owner;
1193 string owner_display_name;
1194
1195 bool del_if_older;
1196 real_time timestamp;
1197
1198 RGWAsyncRemoveObj *req;
1199
1200 rgw_zone_set *zones_trace;
1201
1202 public:
1203 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
1204 const rgw_zone_id& _source_zone,
1205 RGWBucketInfo& _bucket_info,
1206 const rgw_obj_key& _key,
1207 bool _versioned,
1208 uint64_t _versioned_epoch,
1209 string *_owner,
1210 string *_owner_display_name,
1211 bool _delete_marker,
1212 real_time *_timestamp,
1213 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1214 async_rados(_async_rados), store(_store),
1215 source_zone(_source_zone),
1216 bucket_info(_bucket_info),
1217 key(_key),
1218 versioned(_versioned),
1219 versioned_epoch(_versioned_epoch),
1220 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
1221 del_if_older = (_timestamp != NULL);
1222 if (_timestamp) {
1223 timestamp = *_timestamp;
1224 }
1225
1226 if (_owner) {
1227 owner = *_owner;
1228 }
1229
1230 if (_owner_display_name) {
1231 owner_display_name = *_owner_display_name;
1232 }
1233 }
1234 ~RGWRemoveObjCR() override {
1235 request_cleanup();
1236 }
1237
1238 void request_cleanup() override {
1239 if (req) {
1240 req->finish();
1241 req = NULL;
1242 }
1243 }
1244
1245 int send_request() override {
1246 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1247 key, owner, owner_display_name, versioned, versioned_epoch,
1248 delete_marker, del_if_older, timestamp, zones_trace);
1249 async_rados->queue(req);
1250 return 0;
1251 }
1252
1253 int request_complete() override {
1254 return req->get_ret_status();
1255 }
1256 };
1257
1258 class RGWContinuousLeaseCR : public RGWCoroutine {
1259 RGWAsyncRadosProcessor *async_rados;
1260 rgw::sal::RGWRadosStore *store;
1261
1262 const rgw_raw_obj obj;
1263
1264 const string lock_name;
1265 const string cookie;
1266
1267 int interval;
1268 bool going_down{ false };
1269 bool locked{false};
1270
1271 RGWCoroutine *caller;
1272
1273 bool aborted{false};
1274
1275 public:
1276 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
1277 const rgw_raw_obj& _obj,
1278 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1279 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1280 obj(_obj), lock_name(_lock_name),
1281 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1282 interval(_interval), caller(_caller)
1283 {}
1284
1285 int operate() override;
1286
1287 bool is_locked() const {
1288 return locked;
1289 }
1290
1291 void set_locked(bool status) {
1292 locked = status;
1293 }
1294
1295 void go_down() {
1296 going_down = true;
1297 wakeup();
1298 }
1299
1300 void abort() {
1301 aborted = true;
1302 }
1303 };
1304
1305 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1306 rgw::sal::RGWRadosStore *store;
1307 list<cls_log_entry> entries;
1308
1309 string oid;
1310
1311 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1312
1313 public:
1314 RGWRadosTimelogAddCR(rgw::sal::RGWRadosStore *_store, const string& _oid,
1315 const cls_log_entry& entry);
1316
1317 int send_request() override;
1318 int request_complete() override;
1319 };
1320
1321 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1322 rgw::sal::RGWRadosStore *store;
1323 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1324 protected:
1325 std::string oid;
1326 real_time start_time;
1327 real_time end_time;
1328 std::string from_marker;
1329 std::string to_marker;
1330
1331 public:
1332 RGWRadosTimelogTrimCR(rgw::sal::RGWRadosStore *store, const std::string& oid,
1333 const real_time& start_time, const real_time& end_time,
1334 const std::string& from_marker,
1335 const std::string& to_marker);
1336
1337 int send_request() override;
1338 int request_complete() override;
1339 };
1340
1341 // wrapper to update last_trim_marker on success
1342 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1343 CephContext *cct;
1344 std::string *last_trim_marker;
1345 public:
1346 static constexpr const char* max_marker = "99999999";
1347
1348 RGWSyncLogTrimCR(rgw::sal::RGWRadosStore *store, const std::string& oid,
1349 const std::string& to_marker, std::string *last_trim_marker);
1350 int request_complete() override;
1351 };
1352
1353 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1354 rgw::sal::RGWRadosStore *store;
1355 RGWBucketInfo bucket_info;
1356 rgw_obj obj;
1357 uint64_t *psize;
1358 real_time *pmtime;
1359 uint64_t *pepoch;
1360 RGWObjVersionTracker *objv_tracker;
1361 protected:
1362 int _send_request() override;
1363 public:
1364 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *store,
1365 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1366 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1367 RGWObjVersionTracker *objv_tracker = nullptr)
1368 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1369 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1370 };
1371
1372 class RGWStatObjCR : public RGWSimpleCoroutine {
1373 rgw::sal::RGWRadosStore *store;
1374 RGWAsyncRadosProcessor *async_rados;
1375 RGWBucketInfo bucket_info;
1376 rgw_obj obj;
1377 uint64_t *psize;
1378 real_time *pmtime;
1379 uint64_t *pepoch;
1380 RGWObjVersionTracker *objv_tracker;
1381 RGWAsyncStatObj *req = nullptr;
1382 public:
1383 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, rgw::sal::RGWRadosStore *store,
1384 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1385 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1386 RGWObjVersionTracker *objv_tracker = nullptr);
1387 ~RGWStatObjCR() override {
1388 request_cleanup();
1389 }
1390 void request_cleanup() override;
1391
1392 int send_request() override;
1393 int request_complete() override;
1394 };
1395
1396 /// coroutine wrapper for IoCtx::aio_notify()
1397 class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1398 rgw::sal::RGWRadosStore *const store;
1399 const rgw_raw_obj obj;
1400 bufferlist request;
1401 const uint64_t timeout_ms;
1402 bufferlist *response;
1403 rgw_rados_ref ref;
1404 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1405
1406 public:
1407 RGWRadosNotifyCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
1408 bufferlist& request, uint64_t timeout_ms,
1409 bufferlist *response);
1410
1411 int send_request() override;
1412 int request_complete() override;
1413 };
1414
1415 #endif