]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rados.h
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
CommitLineData
7c673cae
FG
1#ifndef CEPH_RGW_CR_RADOS_H
2#define CEPH_RGW_CR_RADOS_H
3
4#include <boost/intrusive_ptr.hpp>
5#include "include/assert.h"
6#include "rgw_coroutine.h"
7#include "rgw_rados.h"
8#include "common/WorkQueue.h"
9#include "common/Throttle.h"
10
11#include <atomic>
12
13class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21protected:
22 virtual int _send_request() = 0;
23public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59};
60
61
62class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100};
101
102
103class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx *obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker *objv_tracker;
108 rgw_raw_obj obj;
109 bufferlist *pbl;
110 map<string, bufferlist> *pattrs;
111 off_t ofs;
112 off_t end;
113protected:
114 int _send_request() override;
115public:
116 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118 bufferlist *_pbl, off_t _ofs, off_t _end);
119 void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120};
121
122class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123 RGWRados *store;
124 RGWObjVersionTracker *objv_tracker;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129protected:
130 int _send_request() override;
131public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist& _bl);
135};
136
137class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138 RGWRados *store;
139 RGWObjVersionTracker *objv_tracker;
140 rgw_raw_obj obj;
141 map<string, bufferlist> *attrs;
142
143protected:
144 int _send_request() override;
145public:
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148 map<string, bufferlist> *_attrs);
149};
150
151class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152 RGWRados *store;
153 rgw_raw_obj obj;
154 string lock_name;
155 string cookie;
156 uint32_t duration_secs;
157
158protected:
159 int _send_request() override;
160public:
161 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163 const string& _name, const string& _cookie, uint32_t _duration_secs);
164};
165
166class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167 RGWRados *store;
168 rgw_raw_obj obj;
169 string lock_name;
170 string cookie;
171
172protected:
173 int _send_request() override;
174public:
175 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177 const string& _name, const string& _cookie);
178};
179
7c673cae
FG
180template <class T>
181class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
182 RGWAsyncRadosProcessor *async_rados;
183 RGWRados *store;
184 RGWObjectCtx obj_ctx;
185 bufferlist bl;
186
187 rgw_raw_obj obj;
188
189 map<string, bufferlist> *pattrs{nullptr};
190
191 T *result;
192 /// on ENOENT, call handle_data() with an empty object instead of failing
193 const bool empty_on_enoent;
194 RGWObjVersionTracker *objv_tracker;
195
196 RGWAsyncGetSystemObj *req{nullptr};
197
198public:
199 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
200 const rgw_raw_obj& _obj,
201 T *_result, bool empty_on_enoent = true,
202 RGWObjVersionTracker *objv_tracker = nullptr)
203 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
204 obj_ctx(store), obj(_obj), result(_result),
205 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
206 ~RGWSimpleRadosReadCR() override {
207 request_cleanup();
208 }
209
210 void request_cleanup() override {
211 if (req) {
212 req->finish();
213 req = NULL;
214 }
215 }
216
217 int send_request() override;
218 int request_complete() override;
219
220 virtual int handle_data(T& data) {
221 return 0;
222 }
223};
224
225template <class T>
226int RGWSimpleRadosReadCR<T>::send_request()
227{
228 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
229 store, &obj_ctx, objv_tracker,
230 obj,
231 &bl, 0, -1);
232 if (pattrs) {
233 req->set_read_attrs(pattrs);
234 }
235 async_rados->queue(req);
236 return 0;
237}
238
239template <class T>
240int RGWSimpleRadosReadCR<T>::request_complete()
241{
242 int ret = req->get_ret_status();
243 retcode = ret;
244 if (ret == -ENOENT && empty_on_enoent) {
245 *result = T();
246 } else {
247 if (ret < 0) {
248 return ret;
249 }
250 try {
251 bufferlist::iterator iter = bl.begin();
252 if (iter.end()) {
253 // allow successful reads with empty buffers. ReadSyncStatus coroutines
254 // depend on this to be able to read without locking, because the
255 // cls lock from InitSyncStatus will create an empty object if it didnt
256 // exist
257 *result = T();
258 } else {
259 ::decode(*result, iter);
260 }
261 } catch (buffer::error& err) {
262 return -EIO;
263 }
264 }
265
266 return handle_data(*result);
267}
268
269class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
270 RGWAsyncRadosProcessor *async_rados;
271 RGWRados *store;
272 RGWObjectCtx obj_ctx;
273 bufferlist bl;
274
275 rgw_raw_obj obj;
276
277 map<string, bufferlist> *pattrs;
278
279 RGWAsyncGetSystemObj *req;
280
281public:
282 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
283 const rgw_raw_obj& _obj,
284 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
285 async_rados(_async_rados), store(_store),
286 obj_ctx(store),
287 obj(_obj),
288 pattrs(_pattrs),
289 req(NULL) { }
290 ~RGWSimpleRadosReadAttrsCR() override {
291 request_cleanup();
292 }
293
294 void request_cleanup() override {
295 if (req) {
296 req->finish();
297 req = NULL;
298 }
299 }
300
301 int send_request() override;
302 int request_complete() override;
303};
304
305template <class T>
306class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
307 RGWAsyncRadosProcessor *async_rados;
308 RGWRados *store;
309 bufferlist bl;
310
311 rgw_raw_obj obj;
312 RGWObjVersionTracker *objv_tracker;
313
314 RGWAsyncPutSystemObj *req{nullptr};
315
316public:
317 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
318 const rgw_raw_obj& _obj,
319 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
320 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
321 store(_store), obj(_obj), objv_tracker(objv_tracker) {
322 ::encode(_data, bl);
323 }
324
325 ~RGWSimpleRadosWriteCR() override {
326 request_cleanup();
327 }
328
329 void request_cleanup() override {
330 if (req) {
331 req->finish();
332 req = NULL;
333 }
334 }
335
336 int send_request() override {
337 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
338 store, objv_tracker, obj, false, bl);
339 async_rados->queue(req);
340 return 0;
341 }
342
343 int request_complete() override {
344 return req->get_ret_status();
345 }
346};
347
348class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
349 RGWAsyncRadosProcessor *async_rados;
350 RGWRados *store;
351
352 rgw_raw_obj obj;
353
354 map<string, bufferlist> attrs;
355
356 RGWAsyncPutSystemObjAttrs *req;
357
358public:
359 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
360 const rgw_raw_obj& _obj,
361 map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
362 async_rados(_async_rados),
363 store(_store),
364 obj(_obj),
365 attrs(_attrs), req(NULL) {
366 }
367 ~RGWSimpleRadosWriteAttrsCR() override {
368 request_cleanup();
369 }
370
371 void request_cleanup() override {
372 if (req) {
373 req->finish();
374 req = NULL;
375 }
376 }
377
378 int send_request() override {
379 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
380 store, NULL, obj, &attrs);
381 async_rados->queue(req);
382 return 0;
383 }
384
385 int request_complete() override {
386 return req->get_ret_status();
387 }
388};
389
390class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
391 RGWRados *store;
392 map<string, bufferlist> entries;
393
394 rgw_rados_ref ref;
395
396 rgw_raw_obj obj;
397
224ce89b 398 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
399
400public:
401 RGWRadosSetOmapKeysCR(RGWRados *_store,
402 const rgw_raw_obj& _obj,
403 map<string, bufferlist>& _entries);
404
7c673cae
FG
405 int send_request() override;
406 int request_complete() override;
407};
408
409class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
410 RGWRados *store;
411
412 string marker;
28e407b8 413 std::set<std::string> *entries;
7c673cae
FG
414 int max_entries;
415
7c673cae
FG
416 rgw_rados_ref ref;
417
418 rgw_raw_obj obj;
419
224ce89b 420 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
421
422public:
423 RGWRadosGetOmapKeysCR(RGWRados *_store,
424 const rgw_raw_obj& _obj,
425 const string& _marker,
28e407b8 426 std::set<std::string> *_entries, int _max_entries);
7c673cae 427
7c673cae 428 int send_request() override;
28e407b8 429 int request_complete() override;
7c673cae
FG
430};
431
432class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
433 RGWRados *store;
434
7c673cae
FG
435 rgw_rados_ref ref;
436
437 set<string> keys;
438
439 rgw_raw_obj obj;
440
224ce89b 441 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
442
443public:
444 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
445 const rgw_raw_obj& _obj,
446 const set<string>& _keys);
447
7c673cae
FG
448 int send_request() override;
449
224ce89b 450 int request_complete() override;
7c673cae
FG
451};
452
453class RGWRadosRemoveCR : public RGWSimpleCoroutine {
454 RGWRados *store;
455 librados::IoCtx ioctx;
456 const rgw_raw_obj obj;
457 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
458
459public:
460 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
461
462 int send_request();
463 int request_complete();
464};
465
466class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
467 RGWAsyncRadosProcessor *async_rados;
468 RGWRados *store;
469 string lock_name;
470 string cookie;
471 uint32_t duration;
472
473 rgw_raw_obj obj;
474
475 RGWAsyncLockSystemObj *req;
476
477public:
478 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
479 const rgw_raw_obj& _obj,
480 const string& _lock_name,
481 const string& _cookie,
482 uint32_t _duration);
483 ~RGWSimpleRadosLockCR() override {
484 request_cleanup();
485 }
486 void request_cleanup() override;
487
488 int send_request() override;
489 int request_complete() override;
490
491 static std::string gen_random_cookie(CephContext* cct) {
492#define COOKIE_LEN 16
493 char buf[COOKIE_LEN + 1];
494 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
495 return buf;
496 }
497};
498
499class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
500 RGWAsyncRadosProcessor *async_rados;
501 RGWRados *store;
502 string lock_name;
503 string cookie;
504
505 rgw_raw_obj obj;
506
507 RGWAsyncUnlockSystemObj *req;
508
509public:
510 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
511 const rgw_raw_obj& _obj,
512 const string& _lock_name,
513 const string& _cookie);
514 ~RGWSimpleRadosUnlockCR() override {
515 request_cleanup();
516 }
517 void request_cleanup() override;
518
519 int send_request() override;
520 int request_complete() override;
521};
522
523#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
524
525class RGWOmapAppend : public RGWConsumerCR<string> {
526 RGWAsyncRadosProcessor *async_rados;
527 RGWRados *store;
528
529 rgw_raw_obj obj;
530
531 bool going_down;
532
533 int num_pending_entries;
534 list<string> pending_entries;
535
536 map<string, bufferlist> entries;
537
538 uint64_t window_size;
539 uint64_t total_entries;
540public:
541 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
542 const rgw_raw_obj& _obj,
543 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
544 int operate() override;
545 void flush_pending();
546 bool append(const string& s);
547 bool finish();
548
549 uint64_t get_total_entries() {
550 return total_entries;
551 }
552
553 const rgw_raw_obj& get_obj() {
554 return obj;
555 }
556};
557
558class RGWAsyncWait : public RGWAsyncRadosRequest {
559 CephContext *cct;
560 Mutex *lock;
561 Cond *cond;
562 utime_t interval;
563protected:
564 int _send_request() override {
565 Mutex::Locker l(*lock);
566 return cond->WaitInterval(*lock, interval);
567 }
568public:
569 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
570 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
571 cct(_cct),
572 lock(_lock), cond(_cond), interval(_secs, 0) {}
573
574 void wakeup() {
575 Mutex::Locker l(*lock);
576 cond->Signal();
577 }
578};
579
580class RGWWaitCR : public RGWSimpleCoroutine {
581 CephContext *cct;
582 RGWAsyncRadosProcessor *async_rados;
583 Mutex *lock;
584 Cond *cond;
585 int secs;
586
587 RGWAsyncWait *req;
588
589public:
590 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
591 Mutex *_lock, Cond *_cond,
592 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
593 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
594 }
595 ~RGWWaitCR() override {
596 request_cleanup();
597 }
598
599 void request_cleanup() override {
600 if (req) {
601 wakeup();
602 req->finish();
603 req = NULL;
604 }
605 }
606
607 int send_request() override {
608 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
609 async_rados->queue(req);
610 return 0;
611 }
612
613 int request_complete() override {
614 return req->get_ret_status();
615 }
616
617 void wakeup() {
618 req->wakeup();
619 }
620};
621
622class RGWShardedOmapCRManager {
623 RGWAsyncRadosProcessor *async_rados;
624 RGWRados *store;
625 RGWCoroutine *op;
626
627 int num_shards;
628
629 vector<RGWOmapAppend *> shards;
630public:
631 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
632 : async_rados(_async_rados),
633 store(_store), op(_op), num_shards(_num_shards) {
634 shards.reserve(num_shards);
635 for (int i = 0; i < num_shards; ++i) {
636 char buf[oid_prefix.size() + 16];
637 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
638 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
639 shard->get();
640 shards.push_back(shard);
641 op->spawn(shard, false);
642 }
643 }
644
645 ~RGWShardedOmapCRManager() {
646 for (auto shard : shards) {
647 shard->put();
648 }
649 }
650
651 bool append(const string& entry, int shard_id) {
652 return shards[shard_id]->append(entry);
653 }
654 bool finish() {
655 bool success = true;
656 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
657 success &= ((*iter)->finish() && (!(*iter)->is_error()));
658 }
659 return success;
660 }
661
662 uint64_t get_total_entries(int shard_id) {
663 return shards[shard_id]->get_total_entries();
664 }
665};
666
667class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
668 RGWRados *store;
b32b8144 669 const std::string oid;
7c673cae
FG
670 RGWBucketInfo *bucket_info;
671
672protected:
673 int _send_request() override;
674public:
675 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
b32b8144 676 RGWRados *_store, const std::string& oid,
7c673cae
FG
677 RGWBucketInfo *_bucket_info)
678 : RGWAsyncRadosRequest(caller, cn), store(_store),
b32b8144 679 oid(oid), bucket_info(_bucket_info) {}
7c673cae
FG
680};
681
682class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
683 RGWAsyncRadosProcessor *async_rados;
684 RGWRados *store;
b32b8144 685 const std::string oid;
7c673cae
FG
686 RGWBucketInfo *bucket_info;
687
b32b8144 688 RGWAsyncGetBucketInstanceInfo *req{nullptr};
7c673cae
FG
689
690public:
b32b8144
FG
691 // metadata key constructor
692 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
693 const std::string& meta_key, RGWBucketInfo *_bucket_info)
694 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
695 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
696 bucket_info(_bucket_info) {}
697 // rgw_bucket constructor
7c673cae
FG
698 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
699 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
700 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
b32b8144
FG
701 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
702 bucket_info(_bucket_info) {}
7c673cae
FG
703 ~RGWGetBucketInstanceInfoCR() override {
704 request_cleanup();
705 }
706 void request_cleanup() override {
707 if (req) {
708 req->finish();
709 req = NULL;
710 }
711 }
712
713 int send_request() override {
b32b8144 714 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid, bucket_info);
7c673cae
FG
715 async_rados->queue(req);
716 return 0;
717 }
718 int request_complete() override {
719 return req->get_ret_status();
720 }
721};
722
b32b8144
FG
723class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
724 RGWRados::BucketShard bs;
725 std::string start_marker;
726 std::string end_marker;
727 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
728 public:
729 RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
730 int shard_id, const std::string& start_marker,
731 const std::string& end_marker);
732
733 int send_request() override;
734 int request_complete() override;
735};
736
7c673cae
FG
737class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
738 RGWRados *store;
739 string source_zone;
740
741 RGWBucketInfo bucket_info;
742
743 rgw_obj_key key;
744 uint64_t versioned_epoch;
745
746 real_time src_mtime;
747
748 bool copy_if_newer;
31f18b77 749 rgw_zone_set *zones_trace;
7c673cae
FG
750
751protected:
752 int _send_request() override;
753public:
754 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
755 const string& _source_zone,
756 RGWBucketInfo& _bucket_info,
757 const rgw_obj_key& _key,
758 uint64_t _versioned_epoch,
31f18b77 759 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
7c673cae
FG
760 source_zone(_source_zone),
761 bucket_info(_bucket_info),
762 key(_key),
763 versioned_epoch(_versioned_epoch),
31f18b77 764 copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
7c673cae
FG
765};
766
767class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
768 CephContext *cct;
769 RGWAsyncRadosProcessor *async_rados;
770 RGWRados *store;
771 string source_zone;
772
773 RGWBucketInfo bucket_info;
774
775 rgw_obj_key key;
776 uint64_t versioned_epoch;
777
778 real_time src_mtime;
779
780 bool copy_if_newer;
781
782 RGWAsyncFetchRemoteObj *req;
31f18b77 783 rgw_zone_set *zones_trace;
7c673cae
FG
784
785public:
786 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
787 const string& _source_zone,
788 RGWBucketInfo& _bucket_info,
789 const rgw_obj_key& _key,
790 uint64_t _versioned_epoch,
31f18b77 791 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
7c673cae
FG
792 async_rados(_async_rados), store(_store),
793 source_zone(_source_zone),
794 bucket_info(_bucket_info),
795 key(_key),
796 versioned_epoch(_versioned_epoch),
31f18b77 797 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
7c673cae
FG
798
799
800 ~RGWFetchRemoteObjCR() override {
801 request_cleanup();
802 }
803
804 void request_cleanup() override {
805 if (req) {
806 req->finish();
807 req = NULL;
808 }
809 }
810
811 int send_request() override {
812 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
31f18b77 813 key, versioned_epoch, copy_if_newer, zones_trace);
7c673cae
FG
814 async_rados->queue(req);
815 return 0;
816 }
817
818 int request_complete() override {
819 return req->get_ret_status();
820 }
821};
822
823class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
824 RGWRados *store;
825 string source_zone;
826
827 RGWBucketInfo bucket_info;
828
829 rgw_obj_key key;
830
831 ceph::real_time *pmtime;
832 uint64_t *psize;
833 map<string, bufferlist> *pattrs;
834
835protected:
836 int _send_request() override;
837public:
838 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
839 const string& _source_zone,
840 RGWBucketInfo& _bucket_info,
841 const rgw_obj_key& _key,
842 ceph::real_time *_pmtime,
843 uint64_t *_psize,
844 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
845 source_zone(_source_zone),
846 bucket_info(_bucket_info),
847 key(_key),
848 pmtime(_pmtime),
849 psize(_psize),
850 pattrs(_pattrs) {}
851};
852
853class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
854 CephContext *cct;
855 RGWAsyncRadosProcessor *async_rados;
856 RGWRados *store;
857 string source_zone;
858
859 RGWBucketInfo bucket_info;
860
861 rgw_obj_key key;
862
863 ceph::real_time *pmtime;
864 uint64_t *psize;
865 map<string, bufferlist> *pattrs;
866
867 RGWAsyncStatRemoteObj *req;
868
869public:
870 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
871 const string& _source_zone,
872 RGWBucketInfo& _bucket_info,
873 const rgw_obj_key& _key,
874 ceph::real_time *_pmtime,
875 uint64_t *_psize,
876 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
877 async_rados(_async_rados), store(_store),
878 source_zone(_source_zone),
879 bucket_info(_bucket_info),
880 key(_key),
881 pmtime(_pmtime),
882 psize(_psize),
883 pattrs(_pattrs),
884 req(NULL) {}
885
886
887 ~RGWStatRemoteObjCR() override {
888 request_cleanup();
889 }
890
891 void request_cleanup() override {
892 if (req) {
893 req->finish();
894 req = NULL;
895 }
896 }
897
898 int send_request() override {
899 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
900 bucket_info, key, pmtime, psize, pattrs);
901 async_rados->queue(req);
902 return 0;
903 }
904
905 int request_complete() override {
906 return req->get_ret_status();
907 }
908};
909
910class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
911 RGWRados *store;
912 string source_zone;
913
914 RGWBucketInfo bucket_info;
915
916 rgw_obj_key key;
917 string owner;
918 string owner_display_name;
919 bool versioned;
920 uint64_t versioned_epoch;
921 string marker_version_id;
922
923 bool del_if_older;
924 ceph::real_time timestamp;
31f18b77 925 rgw_zone_set *zones_trace;
7c673cae
FG
926
927protected:
928 int _send_request() override;
929public:
930 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
931 const string& _source_zone,
932 RGWBucketInfo& _bucket_info,
933 const rgw_obj_key& _key,
934 const string& _owner,
935 const string& _owner_display_name,
936 bool _versioned,
937 uint64_t _versioned_epoch,
938 bool _delete_marker,
939 bool _if_older,
31f18b77
FG
940 real_time& _timestamp,
941 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
7c673cae
FG
942 source_zone(_source_zone),
943 bucket_info(_bucket_info),
944 key(_key),
945 owner(_owner),
946 owner_display_name(_owner_display_name),
947 versioned(_versioned),
948 versioned_epoch(_versioned_epoch),
949 del_if_older(_if_older),
31f18b77 950 timestamp(_timestamp), zones_trace(_zones_trace) {
7c673cae
FG
951 if (_delete_marker) {
952 marker_version_id = key.instance;
953 }
954 }
955};
956
957class RGWRemoveObjCR : public RGWSimpleCoroutine {
958 CephContext *cct;
959 RGWAsyncRadosProcessor *async_rados;
960 RGWRados *store;
961 string source_zone;
962
963 RGWBucketInfo bucket_info;
964
965 rgw_obj_key key;
966 bool versioned;
967 uint64_t versioned_epoch;
968 bool delete_marker;
969 string owner;
970 string owner_display_name;
971
972 bool del_if_older;
973 real_time timestamp;
974
975 RGWAsyncRemoveObj *req;
31f18b77
FG
976
977 rgw_zone_set *zones_trace;
7c673cae
FG
978
979public:
980 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
981 const string& _source_zone,
982 RGWBucketInfo& _bucket_info,
983 const rgw_obj_key& _key,
984 bool _versioned,
985 uint64_t _versioned_epoch,
986 string *_owner,
987 string *_owner_display_name,
988 bool _delete_marker,
31f18b77
FG
989 real_time *_timestamp,
990 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
7c673cae
FG
991 async_rados(_async_rados), store(_store),
992 source_zone(_source_zone),
993 bucket_info(_bucket_info),
994 key(_key),
995 versioned(_versioned),
996 versioned_epoch(_versioned_epoch),
31f18b77 997 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
7c673cae
FG
998 del_if_older = (_timestamp != NULL);
999 if (_timestamp) {
1000 timestamp = *_timestamp;
1001 }
1002
1003 if (_owner) {
1004 owner = *_owner;
1005 }
1006
1007 if (_owner_display_name) {
1008 owner_display_name = *_owner_display_name;
1009 }
1010 }
1011 ~RGWRemoveObjCR() override {
1012 request_cleanup();
1013 }
1014
1015 void request_cleanup() override {
1016 if (req) {
1017 req->finish();
1018 req = NULL;
1019 }
1020 }
1021
1022 int send_request() override {
1023 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1024 key, owner, owner_display_name, versioned, versioned_epoch,
31f18b77 1025 delete_marker, del_if_older, timestamp, zones_trace);
7c673cae
FG
1026 async_rados->queue(req);
1027 return 0;
1028 }
1029
1030 int request_complete() override {
1031 return req->get_ret_status();
1032 }
1033};
1034
1035class RGWContinuousLeaseCR : public RGWCoroutine {
1036 RGWAsyncRadosProcessor *async_rados;
1037 RGWRados *store;
1038
1039 const rgw_raw_obj obj;
1040
1041 const string lock_name;
1042 const string cookie;
1043
1044 int interval;
1045
1046 Mutex lock;
1047 std::atomic<bool> going_down = { false };
1048 bool locked{false};
1049
1050 RGWCoroutine *caller;
1051
1052 bool aborted{false};
1053
1054public:
1055 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1056 const rgw_raw_obj& _obj,
1057 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1058 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1059 obj(_obj), lock_name(_lock_name),
1060 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1061 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1062 {}
1063
1064 int operate() override;
1065
1066 bool is_locked() {
1067 Mutex::Locker l(lock);
1068 return locked;
1069 }
1070
1071 void set_locked(bool status) {
1072 Mutex::Locker l(lock);
1073 locked = status;
1074 }
1075
1076 void go_down() {
1077 going_down = true;
1078 wakeup();
1079 }
1080
1081 void abort() {
1082 aborted = true;
1083 }
1084};
1085
1086class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1087 RGWRados *store;
1088 list<cls_log_entry> entries;
1089
1090 string oid;
1091
224ce89b 1092 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
1093
1094public:
1095 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1096 const cls_log_entry& entry);
7c673cae
FG
1097
1098 int send_request() override;
1099 int request_complete() override;
1100};
1101
1102class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1103 RGWRados *store;
224ce89b 1104 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
7c673cae
FG
1105 protected:
1106 std::string oid;
1107 real_time start_time;
1108 real_time end_time;
1109 std::string from_marker;
1110 std::string to_marker;
1111
1112 public:
1113 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1114 const real_time& start_time, const real_time& end_time,
1115 const std::string& from_marker,
1116 const std::string& to_marker);
7c673cae
FG
1117
1118 int send_request() override;
1119 int request_complete() override;
1120};
1121
1122// wrapper to update last_trim_marker on success
1123class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1124 CephContext *cct;
1125 std::string *last_trim_marker;
1126 public:
1127 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1128 const std::string& to_marker, std::string *last_trim_marker);
1129 int request_complete() override;
1130};
1131
1132class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1133 RGWRados *store;
1134 RGWBucketInfo bucket_info;
1135 rgw_obj obj;
1136 uint64_t *psize;
1137 real_time *pmtime;
1138 uint64_t *pepoch;
1139 RGWObjVersionTracker *objv_tracker;
1140protected:
1141 int _send_request() override;
1142public:
1143 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1144 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1145 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1146 RGWObjVersionTracker *objv_tracker = nullptr)
1147 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1148 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1149};
1150
1151class RGWStatObjCR : public RGWSimpleCoroutine {
1152 RGWRados *store;
1153 RGWAsyncRadosProcessor *async_rados;
1154 RGWBucketInfo bucket_info;
1155 rgw_obj obj;
1156 uint64_t *psize;
1157 real_time *pmtime;
1158 uint64_t *pepoch;
1159 RGWObjVersionTracker *objv_tracker;
1160 RGWAsyncStatObj *req = nullptr;
1161 public:
1162 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1163 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1164 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1165 RGWObjVersionTracker *objv_tracker = nullptr);
1166 ~RGWStatObjCR() override {
1167 request_cleanup();
1168 }
1169 void request_cleanup() override;
1170
1171 int send_request() override;
1172 int request_complete() override;
1173};
1174
b32b8144
FG
1175/// coroutine wrapper for IoCtx::aio_notify()
1176class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1177 RGWRados *const store;
1178 const rgw_raw_obj obj;
1179 bufferlist request;
1180 const uint64_t timeout_ms;
1181 bufferlist *response;
1182 rgw_rados_ref ref;
1183 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1184
1185public:
1186 RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
1187 bufferlist& request, uint64_t timeout_ms,
1188 bufferlist *response);
1189
1190 int send_request() override;
1191 int request_complete() override;
1192};
1193
7c673cae 1194#endif