]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rados.h
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
CommitLineData
7c673cae
FG
1#ifndef CEPH_RGW_CR_RADOS_H
2#define CEPH_RGW_CR_RADOS_H
3
4#include <boost/intrusive_ptr.hpp>
5#include "include/assert.h"
6#include "rgw_coroutine.h"
7#include "rgw_rados.h"
8#include "common/WorkQueue.h"
9#include "common/Throttle.h"
10
11#include <atomic>
12
13class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21protected:
22 virtual int _send_request() = 0;
23public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59};
60
61
62class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100};
101
102
103class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
91327a77 105 RGWObjectCtx obj_ctx;
7c673cae 106 RGWRados::SystemObject::Read::GetObjState read_state;
91327a77 107 RGWObjVersionTracker objv_tracker;
7c673cae 108 rgw_raw_obj obj;
7c673cae
FG
109 off_t ofs;
110 off_t end;
91327a77 111 const bool want_attrs;
7c673cae
FG
112protected:
113 int _send_request() override;
114public:
91327a77 115 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
7c673cae 116 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
91327a77
AA
117 off_t _ofs, off_t _end, bool want_attrs);
118
119 bufferlist bl;
120 map<string, bufferlist> attrs;
7c673cae
FG
121};
122
123class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
124 RGWRados *store;
7c673cae
FG
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129protected:
130 int _send_request() override;
131public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
91327a77
AA
133 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist _bl);
135
136 RGWObjVersionTracker objv_tracker;
7c673cae
FG
137};
138
139class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
140 RGWRados *store;
7c673cae 141 rgw_raw_obj obj;
91327a77 142 map<string, bufferlist> attrs;
7c673cae
FG
143
144protected:
145 int _send_request() override;
146public:
147 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
148 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
91327a77
AA
149 map<string, bufferlist> _attrs);
150
151 RGWObjVersionTracker objv_tracker;
7c673cae
FG
152};
153
154class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
155 RGWRados *store;
156 rgw_raw_obj obj;
157 string lock_name;
158 string cookie;
159 uint32_t duration_secs;
160
161protected:
162 int _send_request() override;
163public:
164 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
165 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
166 const string& _name, const string& _cookie, uint32_t _duration_secs);
167};
168
169class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
170 RGWRados *store;
171 rgw_raw_obj obj;
172 string lock_name;
173 string cookie;
174
175protected:
176 int _send_request() override;
177public:
178 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
179 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
180 const string& _name, const string& _cookie);
181};
182
7c673cae
FG
183template <class T>
184class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
185 RGWAsyncRadosProcessor *async_rados;
186 RGWRados *store;
7c673cae 187 rgw_raw_obj obj;
7c673cae
FG
188 T *result;
189 /// on ENOENT, call handle_data() with an empty object instead of failing
190 const bool empty_on_enoent;
191 RGWObjVersionTracker *objv_tracker;
7c673cae
FG
192 RGWAsyncGetSystemObj *req{nullptr};
193
194public:
195 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
196 const rgw_raw_obj& _obj,
197 T *_result, bool empty_on_enoent = true,
198 RGWObjVersionTracker *objv_tracker = nullptr)
199 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
91327a77 200 obj(_obj), result(_result),
7c673cae
FG
201 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
202 ~RGWSimpleRadosReadCR() override {
203 request_cleanup();
204 }
205
206 void request_cleanup() override {
207 if (req) {
208 req->finish();
209 req = NULL;
210 }
211 }
212
213 int send_request() override;
214 int request_complete() override;
215
216 virtual int handle_data(T& data) {
217 return 0;
218 }
219};
220
221template <class T>
222int RGWSimpleRadosReadCR<T>::send_request()
223{
224 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
91327a77 225 store, objv_tracker, obj, 0, -1, false);
7c673cae
FG
226 async_rados->queue(req);
227 return 0;
228}
229
230template <class T>
231int RGWSimpleRadosReadCR<T>::request_complete()
232{
233 int ret = req->get_ret_status();
234 retcode = ret;
235 if (ret == -ENOENT && empty_on_enoent) {
236 *result = T();
237 } else {
238 if (ret < 0) {
239 return ret;
240 }
241 try {
91327a77 242 bufferlist::iterator iter = req->bl.begin();
7c673cae
FG
243 if (iter.end()) {
244 // allow successful reads with empty buffers. ReadSyncStatus coroutines
245 // depend on this to be able to read without locking, because the
246 // cls lock from InitSyncStatus will create an empty object if it didnt
247 // exist
248 *result = T();
249 } else {
250 ::decode(*result, iter);
251 }
252 } catch (buffer::error& err) {
253 return -EIO;
254 }
255 }
256
257 return handle_data(*result);
258}
259
260class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
261 RGWAsyncRadosProcessor *async_rados;
262 RGWRados *store;
7c673cae 263 rgw_raw_obj obj;
7c673cae 264 map<string, bufferlist> *pattrs;
7c673cae
FG
265 RGWAsyncGetSystemObj *req;
266
267public:
268 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
269 const rgw_raw_obj& _obj,
270 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
271 async_rados(_async_rados), store(_store),
7c673cae
FG
272 obj(_obj),
273 pattrs(_pattrs),
274 req(NULL) { }
275 ~RGWSimpleRadosReadAttrsCR() override {
276 request_cleanup();
277 }
278
279 void request_cleanup() override {
280 if (req) {
281 req->finish();
282 req = NULL;
283 }
284 }
285
286 int send_request() override;
287 int request_complete() override;
288};
289
290template <class T>
291class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
292 RGWAsyncRadosProcessor *async_rados;
293 RGWRados *store;
294 bufferlist bl;
7c673cae
FG
295 rgw_raw_obj obj;
296 RGWObjVersionTracker *objv_tracker;
7c673cae
FG
297 RGWAsyncPutSystemObj *req{nullptr};
298
299public:
300 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
301 const rgw_raw_obj& _obj,
302 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
303 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
304 store(_store), obj(_obj), objv_tracker(objv_tracker) {
305 ::encode(_data, bl);
306 }
307
308 ~RGWSimpleRadosWriteCR() override {
309 request_cleanup();
310 }
311
312 void request_cleanup() override {
313 if (req) {
314 req->finish();
315 req = NULL;
316 }
317 }
318
319 int send_request() override {
320 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
91327a77 321 store, objv_tracker, obj, false, std::move(bl));
7c673cae
FG
322 async_rados->queue(req);
323 return 0;
324 }
325
326 int request_complete() override {
91327a77
AA
327 if (objv_tracker) { // copy the updated version
328 *objv_tracker = req->objv_tracker;
329 }
7c673cae
FG
330 return req->get_ret_status();
331 }
332};
333
334class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
335 RGWAsyncRadosProcessor *async_rados;
336 RGWRados *store;
91327a77 337 RGWObjVersionTracker *objv_tracker;
7c673cae 338 rgw_raw_obj obj;
7c673cae 339 map<string, bufferlist> attrs;
91327a77 340 RGWAsyncPutSystemObjAttrs *req = nullptr;
7c673cae
FG
341
342public:
91327a77
AA
343 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados,
344 RGWRados *_store, const rgw_raw_obj& _obj,
345 map<string, bufferlist> _attrs,
346 RGWObjVersionTracker *objv_tracker = nullptr)
347 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
348 store(_store), objv_tracker(objv_tracker), obj(_obj),
349 attrs(std::move(_attrs)) {
7c673cae
FG
350 }
351 ~RGWSimpleRadosWriteAttrsCR() override {
352 request_cleanup();
353 }
354
355 void request_cleanup() override {
356 if (req) {
357 req->finish();
358 req = NULL;
359 }
360 }
361
362 int send_request() override {
363 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
91327a77 364 store, objv_tracker, obj, std::move(attrs));
7c673cae
FG
365 async_rados->queue(req);
366 return 0;
367 }
368
369 int request_complete() override {
91327a77
AA
370 if (objv_tracker) { // copy the updated version
371 *objv_tracker = req->objv_tracker;
372 }
7c673cae
FG
373 return req->get_ret_status();
374 }
375};
376
377class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
378 RGWRados *store;
379 map<string, bufferlist> entries;
380
381 rgw_rados_ref ref;
382
383 rgw_raw_obj obj;
384
224ce89b 385 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
386
387public:
388 RGWRadosSetOmapKeysCR(RGWRados *_store,
389 const rgw_raw_obj& _obj,
390 map<string, bufferlist>& _entries);
391
7c673cae
FG
392 int send_request() override;
393 int request_complete() override;
394};
395
396class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
397 RGWRados *store;
398
399 string marker;
28e407b8 400 std::set<std::string> *entries;
7c673cae
FG
401 int max_entries;
402
7c673cae
FG
403 rgw_rados_ref ref;
404
405 rgw_raw_obj obj;
406
224ce89b 407 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
408
409public:
410 RGWRadosGetOmapKeysCR(RGWRados *_store,
411 const rgw_raw_obj& _obj,
412 const string& _marker,
28e407b8 413 std::set<std::string> *_entries, int _max_entries);
7c673cae 414
7c673cae 415 int send_request() override;
28e407b8 416 int request_complete() override;
7c673cae
FG
417};
418
419class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
420 RGWRados *store;
421
7c673cae
FG
422 rgw_rados_ref ref;
423
424 set<string> keys;
425
426 rgw_raw_obj obj;
427
224ce89b 428 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
429
430public:
431 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
432 const rgw_raw_obj& _obj,
433 const set<string>& _keys);
434
7c673cae
FG
435 int send_request() override;
436
224ce89b 437 int request_complete() override;
7c673cae
FG
438};
439
440class RGWRadosRemoveCR : public RGWSimpleCoroutine {
441 RGWRados *store;
442 librados::IoCtx ioctx;
443 const rgw_raw_obj obj;
444 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
445
446public:
447 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
448
449 int send_request();
450 int request_complete();
451};
452
453class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
454 RGWAsyncRadosProcessor *async_rados;
455 RGWRados *store;
456 string lock_name;
457 string cookie;
458 uint32_t duration;
459
460 rgw_raw_obj obj;
461
462 RGWAsyncLockSystemObj *req;
463
464public:
465 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
466 const rgw_raw_obj& _obj,
467 const string& _lock_name,
468 const string& _cookie,
469 uint32_t _duration);
470 ~RGWSimpleRadosLockCR() override {
471 request_cleanup();
472 }
473 void request_cleanup() override;
474
475 int send_request() override;
476 int request_complete() override;
477
478 static std::string gen_random_cookie(CephContext* cct) {
479#define COOKIE_LEN 16
480 char buf[COOKIE_LEN + 1];
481 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
482 return buf;
483 }
484};
485
486class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
487 RGWAsyncRadosProcessor *async_rados;
488 RGWRados *store;
489 string lock_name;
490 string cookie;
491
492 rgw_raw_obj obj;
493
494 RGWAsyncUnlockSystemObj *req;
495
496public:
497 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
498 const rgw_raw_obj& _obj,
499 const string& _lock_name,
500 const string& _cookie);
501 ~RGWSimpleRadosUnlockCR() override {
502 request_cleanup();
503 }
504 void request_cleanup() override;
505
506 int send_request() override;
507 int request_complete() override;
508};
509
510#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
511
512class RGWOmapAppend : public RGWConsumerCR<string> {
513 RGWAsyncRadosProcessor *async_rados;
514 RGWRados *store;
515
516 rgw_raw_obj obj;
517
518 bool going_down;
519
520 int num_pending_entries;
521 list<string> pending_entries;
522
523 map<string, bufferlist> entries;
524
525 uint64_t window_size;
526 uint64_t total_entries;
527public:
528 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
529 const rgw_raw_obj& _obj,
530 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
531 int operate() override;
532 void flush_pending();
533 bool append(const string& s);
534 bool finish();
535
536 uint64_t get_total_entries() {
537 return total_entries;
538 }
539
540 const rgw_raw_obj& get_obj() {
541 return obj;
542 }
543};
544
545class RGWAsyncWait : public RGWAsyncRadosRequest {
546 CephContext *cct;
547 Mutex *lock;
548 Cond *cond;
549 utime_t interval;
550protected:
551 int _send_request() override {
552 Mutex::Locker l(*lock);
553 return cond->WaitInterval(*lock, interval);
554 }
555public:
556 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
557 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
558 cct(_cct),
559 lock(_lock), cond(_cond), interval(_secs, 0) {}
560
561 void wakeup() {
562 Mutex::Locker l(*lock);
563 cond->Signal();
564 }
565};
566
567class RGWWaitCR : public RGWSimpleCoroutine {
568 CephContext *cct;
569 RGWAsyncRadosProcessor *async_rados;
570 Mutex *lock;
571 Cond *cond;
572 int secs;
573
574 RGWAsyncWait *req;
575
576public:
577 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
578 Mutex *_lock, Cond *_cond,
579 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
580 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
581 }
582 ~RGWWaitCR() override {
583 request_cleanup();
584 }
585
586 void request_cleanup() override {
587 if (req) {
588 wakeup();
589 req->finish();
590 req = NULL;
591 }
592 }
593
594 int send_request() override {
595 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
596 async_rados->queue(req);
597 return 0;
598 }
599
600 int request_complete() override {
601 return req->get_ret_status();
602 }
603
604 void wakeup() {
605 req->wakeup();
606 }
607};
608
609class RGWShardedOmapCRManager {
610 RGWAsyncRadosProcessor *async_rados;
611 RGWRados *store;
612 RGWCoroutine *op;
613
614 int num_shards;
615
616 vector<RGWOmapAppend *> shards;
617public:
618 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
619 : async_rados(_async_rados),
620 store(_store), op(_op), num_shards(_num_shards) {
621 shards.reserve(num_shards);
622 for (int i = 0; i < num_shards; ++i) {
623 char buf[oid_prefix.size() + 16];
624 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
625 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
626 shard->get();
627 shards.push_back(shard);
628 op->spawn(shard, false);
629 }
630 }
631
632 ~RGWShardedOmapCRManager() {
633 for (auto shard : shards) {
634 shard->put();
635 }
636 }
637
638 bool append(const string& entry, int shard_id) {
639 return shards[shard_id]->append(entry);
640 }
641 bool finish() {
642 bool success = true;
643 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
644 success &= ((*iter)->finish() && (!(*iter)->is_error()));
645 }
646 return success;
647 }
648
649 uint64_t get_total_entries(int shard_id) {
650 return shards[shard_id]->get_total_entries();
651 }
652};
653
654class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
655 RGWRados *store;
b32b8144 656 const std::string oid;
7c673cae
FG
657
658protected:
659 int _send_request() override;
660public:
661 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
91327a77
AA
662 RGWRados *_store, const std::string& oid)
663 : RGWAsyncRadosRequest(caller, cn), store(_store), oid(oid) {}
664
665 RGWBucketInfo bucket_info;
7c673cae
FG
666};
667
668class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
669 RGWAsyncRadosProcessor *async_rados;
670 RGWRados *store;
b32b8144 671 const std::string oid;
7c673cae
FG
672 RGWBucketInfo *bucket_info;
673
b32b8144 674 RGWAsyncGetBucketInstanceInfo *req{nullptr};
7c673cae
FG
675
676public:
b32b8144
FG
677 // metadata key constructor
678 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
679 const std::string& meta_key, RGWBucketInfo *_bucket_info)
680 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
681 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
682 bucket_info(_bucket_info) {}
683 // rgw_bucket constructor
7c673cae
FG
684 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
685 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
686 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
b32b8144
FG
687 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
688 bucket_info(_bucket_info) {}
7c673cae
FG
689 ~RGWGetBucketInstanceInfoCR() override {
690 request_cleanup();
691 }
692 void request_cleanup() override {
693 if (req) {
694 req->finish();
695 req = NULL;
696 }
697 }
698
699 int send_request() override {
91327a77 700 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid);
7c673cae
FG
701 async_rados->queue(req);
702 return 0;
703 }
704 int request_complete() override {
91327a77
AA
705 if (bucket_info) {
706 *bucket_info = std::move(req->bucket_info);
707 }
7c673cae
FG
708 return req->get_ret_status();
709 }
710};
711
b32b8144
FG
712class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
713 RGWRados::BucketShard bs;
714 std::string start_marker;
715 std::string end_marker;
716 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
717 public:
718 RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
719 int shard_id, const std::string& start_marker,
720 const std::string& end_marker);
721
722 int send_request() override;
723 int request_complete() override;
724};
725
7c673cae
FG
726class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
727 RGWRados *store;
728 string source_zone;
729
730 RGWBucketInfo bucket_info;
731
732 rgw_obj_key key;
91327a77 733 boost::optional<uint64_t> versioned_epoch;
7c673cae
FG
734
735 real_time src_mtime;
736
737 bool copy_if_newer;
91327a77 738 rgw_zone_set zones_trace;
7c673cae
FG
739
740protected:
741 int _send_request() override;
742public:
743 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
744 const string& _source_zone,
745 RGWBucketInfo& _bucket_info,
746 const rgw_obj_key& _key,
91327a77 747 boost::optional<uint64_t> _versioned_epoch,
31f18b77 748 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
7c673cae
FG
749 source_zone(_source_zone),
750 bucket_info(_bucket_info),
751 key(_key),
752 versioned_epoch(_versioned_epoch),
91327a77
AA
753 copy_if_newer(_if_newer)
754 {
755 if (_zones_trace) {
756 zones_trace = *_zones_trace;
757 }
758 }
7c673cae
FG
759};
760
761class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
762 CephContext *cct;
763 RGWAsyncRadosProcessor *async_rados;
764 RGWRados *store;
765 string source_zone;
766
767 RGWBucketInfo bucket_info;
768
769 rgw_obj_key key;
91327a77 770 boost::optional<uint64_t> versioned_epoch;
7c673cae
FG
771
772 real_time src_mtime;
773
774 bool copy_if_newer;
775
776 RGWAsyncFetchRemoteObj *req;
31f18b77 777 rgw_zone_set *zones_trace;
7c673cae
FG
778
779public:
780 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
781 const string& _source_zone,
782 RGWBucketInfo& _bucket_info,
783 const rgw_obj_key& _key,
91327a77 784 boost::optional<uint64_t> _versioned_epoch,
31f18b77 785 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
7c673cae
FG
786 async_rados(_async_rados), store(_store),
787 source_zone(_source_zone),
788 bucket_info(_bucket_info),
789 key(_key),
790 versioned_epoch(_versioned_epoch),
31f18b77 791 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
7c673cae
FG
792
793
794 ~RGWFetchRemoteObjCR() override {
795 request_cleanup();
796 }
797
798 void request_cleanup() override {
799 if (req) {
800 req->finish();
801 req = NULL;
802 }
803 }
804
805 int send_request() override {
806 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
31f18b77 807 key, versioned_epoch, copy_if_newer, zones_trace);
7c673cae
FG
808 async_rados->queue(req);
809 return 0;
810 }
811
812 int request_complete() override {
813 return req->get_ret_status();
814 }
815};
816
817class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
818 RGWRados *store;
819 string source_zone;
820
821 RGWBucketInfo bucket_info;
822
823 rgw_obj_key key;
824
825 ceph::real_time *pmtime;
826 uint64_t *psize;
827 map<string, bufferlist> *pattrs;
828
829protected:
830 int _send_request() override;
831public:
832 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
833 const string& _source_zone,
834 RGWBucketInfo& _bucket_info,
835 const rgw_obj_key& _key,
836 ceph::real_time *_pmtime,
837 uint64_t *_psize,
838 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
839 source_zone(_source_zone),
840 bucket_info(_bucket_info),
841 key(_key),
842 pmtime(_pmtime),
843 psize(_psize),
844 pattrs(_pattrs) {}
845};
846
847class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
848 CephContext *cct;
849 RGWAsyncRadosProcessor *async_rados;
850 RGWRados *store;
851 string source_zone;
852
853 RGWBucketInfo bucket_info;
854
855 rgw_obj_key key;
856
857 ceph::real_time *pmtime;
858 uint64_t *psize;
859 map<string, bufferlist> *pattrs;
860
861 RGWAsyncStatRemoteObj *req;
862
863public:
864 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
865 const string& _source_zone,
866 RGWBucketInfo& _bucket_info,
867 const rgw_obj_key& _key,
868 ceph::real_time *_pmtime,
869 uint64_t *_psize,
870 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
871 async_rados(_async_rados), store(_store),
872 source_zone(_source_zone),
873 bucket_info(_bucket_info),
874 key(_key),
875 pmtime(_pmtime),
876 psize(_psize),
877 pattrs(_pattrs),
878 req(NULL) {}
879
880
881 ~RGWStatRemoteObjCR() override {
882 request_cleanup();
883 }
884
885 void request_cleanup() override {
886 if (req) {
887 req->finish();
888 req = NULL;
889 }
890 }
891
892 int send_request() override {
893 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
894 bucket_info, key, pmtime, psize, pattrs);
895 async_rados->queue(req);
896 return 0;
897 }
898
899 int request_complete() override {
900 return req->get_ret_status();
901 }
902};
903
904class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
905 RGWRados *store;
906 string source_zone;
907
908 RGWBucketInfo bucket_info;
909
910 rgw_obj_key key;
911 string owner;
912 string owner_display_name;
913 bool versioned;
914 uint64_t versioned_epoch;
915 string marker_version_id;
916
917 bool del_if_older;
918 ceph::real_time timestamp;
91327a77 919 rgw_zone_set zones_trace;
7c673cae
FG
920
921protected:
922 int _send_request() override;
923public:
924 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
925 const string& _source_zone,
926 RGWBucketInfo& _bucket_info,
927 const rgw_obj_key& _key,
928 const string& _owner,
929 const string& _owner_display_name,
930 bool _versioned,
931 uint64_t _versioned_epoch,
932 bool _delete_marker,
933 bool _if_older,
31f18b77
FG
934 real_time& _timestamp,
935 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
7c673cae
FG
936 source_zone(_source_zone),
937 bucket_info(_bucket_info),
938 key(_key),
939 owner(_owner),
940 owner_display_name(_owner_display_name),
941 versioned(_versioned),
942 versioned_epoch(_versioned_epoch),
943 del_if_older(_if_older),
91327a77 944 timestamp(_timestamp) {
7c673cae
FG
945 if (_delete_marker) {
946 marker_version_id = key.instance;
947 }
91327a77
AA
948
949 if (_zones_trace) {
950 zones_trace = *_zones_trace;
951 }
7c673cae
FG
952 }
953};
954
955class RGWRemoveObjCR : public RGWSimpleCoroutine {
956 CephContext *cct;
957 RGWAsyncRadosProcessor *async_rados;
958 RGWRados *store;
959 string source_zone;
960
961 RGWBucketInfo bucket_info;
962
963 rgw_obj_key key;
964 bool versioned;
965 uint64_t versioned_epoch;
966 bool delete_marker;
967 string owner;
968 string owner_display_name;
969
970 bool del_if_older;
971 real_time timestamp;
972
973 RGWAsyncRemoveObj *req;
31f18b77
FG
974
975 rgw_zone_set *zones_trace;
7c673cae
FG
976
977public:
978 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
979 const string& _source_zone,
980 RGWBucketInfo& _bucket_info,
981 const rgw_obj_key& _key,
982 bool _versioned,
983 uint64_t _versioned_epoch,
984 string *_owner,
985 string *_owner_display_name,
986 bool _delete_marker,
31f18b77
FG
987 real_time *_timestamp,
988 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
7c673cae
FG
989 async_rados(_async_rados), store(_store),
990 source_zone(_source_zone),
991 bucket_info(_bucket_info),
992 key(_key),
993 versioned(_versioned),
994 versioned_epoch(_versioned_epoch),
31f18b77 995 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
7c673cae
FG
996 del_if_older = (_timestamp != NULL);
997 if (_timestamp) {
998 timestamp = *_timestamp;
999 }
1000
1001 if (_owner) {
1002 owner = *_owner;
1003 }
1004
1005 if (_owner_display_name) {
1006 owner_display_name = *_owner_display_name;
1007 }
1008 }
1009 ~RGWRemoveObjCR() override {
1010 request_cleanup();
1011 }
1012
1013 void request_cleanup() override {
1014 if (req) {
1015 req->finish();
1016 req = NULL;
1017 }
1018 }
1019
1020 int send_request() override {
1021 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1022 key, owner, owner_display_name, versioned, versioned_epoch,
31f18b77 1023 delete_marker, del_if_older, timestamp, zones_trace);
7c673cae
FG
1024 async_rados->queue(req);
1025 return 0;
1026 }
1027
1028 int request_complete() override {
1029 return req->get_ret_status();
1030 }
1031};
1032
1033class RGWContinuousLeaseCR : public RGWCoroutine {
1034 RGWAsyncRadosProcessor *async_rados;
1035 RGWRados *store;
1036
1037 const rgw_raw_obj obj;
1038
1039 const string lock_name;
1040 const string cookie;
1041
1042 int interval;
1043
1044 Mutex lock;
1045 std::atomic<bool> going_down = { false };
1046 bool locked{false};
1047
1048 RGWCoroutine *caller;
1049
1050 bool aborted{false};
1051
1052public:
1053 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1054 const rgw_raw_obj& _obj,
1055 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1056 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1057 obj(_obj), lock_name(_lock_name),
1058 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1059 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1060 {}
1061
1062 int operate() override;
1063
1064 bool is_locked() {
1065 Mutex::Locker l(lock);
1066 return locked;
1067 }
1068
1069 void set_locked(bool status) {
1070 Mutex::Locker l(lock);
1071 locked = status;
1072 }
1073
1074 void go_down() {
1075 going_down = true;
1076 wakeup();
1077 }
1078
1079 void abort() {
1080 aborted = true;
1081 }
1082};
1083
1084class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1085 RGWRados *store;
1086 list<cls_log_entry> entries;
1087
1088 string oid;
1089
224ce89b 1090 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
1091
1092public:
1093 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1094 const cls_log_entry& entry);
7c673cae
FG
1095
1096 int send_request() override;
1097 int request_complete() override;
1098};
1099
1100class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1101 RGWRados *store;
224ce89b 1102 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
1103 protected:
1104 std::string oid;
1105 real_time start_time;
1106 real_time end_time;
1107 std::string from_marker;
1108 std::string to_marker;
1109
1110 public:
1111 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1112 const real_time& start_time, const real_time& end_time,
1113 const std::string& from_marker,
1114 const std::string& to_marker);
7c673cae
FG
1115
1116 int send_request() override;
1117 int request_complete() override;
1118};
1119
1120// wrapper to update last_trim_marker on success
1121class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1122 CephContext *cct;
1123 std::string *last_trim_marker;
1124 public:
1125 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1126 const std::string& to_marker, std::string *last_trim_marker);
1127 int request_complete() override;
1128};
1129
1130class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1131 RGWRados *store;
1132 RGWBucketInfo bucket_info;
1133 rgw_obj obj;
1134 uint64_t *psize;
1135 real_time *pmtime;
1136 uint64_t *pepoch;
1137 RGWObjVersionTracker *objv_tracker;
1138protected:
1139 int _send_request() override;
1140public:
1141 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1142 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1143 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1144 RGWObjVersionTracker *objv_tracker = nullptr)
1145 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1146 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1147};
1148
1149class RGWStatObjCR : public RGWSimpleCoroutine {
1150 RGWRados *store;
1151 RGWAsyncRadosProcessor *async_rados;
1152 RGWBucketInfo bucket_info;
1153 rgw_obj obj;
1154 uint64_t *psize;
1155 real_time *pmtime;
1156 uint64_t *pepoch;
1157 RGWObjVersionTracker *objv_tracker;
1158 RGWAsyncStatObj *req = nullptr;
1159 public:
1160 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1161 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1162 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1163 RGWObjVersionTracker *objv_tracker = nullptr);
1164 ~RGWStatObjCR() override {
1165 request_cleanup();
1166 }
1167 void request_cleanup() override;
1168
1169 int send_request() override;
1170 int request_complete() override;
1171};
1172
b32b8144
FG
1173/// coroutine wrapper for IoCtx::aio_notify()
1174class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1175 RGWRados *const store;
1176 const rgw_raw_obj obj;
1177 bufferlist request;
1178 const uint64_t timeout_ms;
1179 bufferlist *response;
1180 rgw_rados_ref ref;
1181 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1182
1183public:
1184 RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
1185 bufferlist& request, uint64_t timeout_ms,
1186 bufferlist *response);
1187
1188 int send_request() override;
1189 int request_complete() override;
1190};
1191
7c673cae 1192#endif