]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
3
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_rados.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
10
11 #include <atomic>
12
13 class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21 protected:
22 virtual int _send_request() = 0;
23 public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59 };
60
61
62 class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65 protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89 public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100 };
101
102
103 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx *obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker *objv_tracker;
108 rgw_raw_obj obj;
109 bufferlist *pbl;
110 map<string, bufferlist> *pattrs;
111 off_t ofs;
112 off_t end;
113 protected:
114 int _send_request() override;
115 public:
116 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118 bufferlist *_pbl, off_t _ofs, off_t _end);
119 void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120 };
121
122 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123 RGWRados *store;
124 RGWObjVersionTracker *objv_tracker;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129 protected:
130 int _send_request() override;
131 public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist& _bl);
135 };
136
137 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138 RGWRados *store;
139 RGWObjVersionTracker *objv_tracker;
140 rgw_raw_obj obj;
141 map<string, bufferlist> *attrs;
142
143 protected:
144 int _send_request() override;
145 public:
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148 map<string, bufferlist> *_attrs);
149 };
150
151 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152 RGWRados *store;
153 rgw_raw_obj obj;
154 string lock_name;
155 string cookie;
156 uint32_t duration_secs;
157
158 protected:
159 int _send_request() override;
160 public:
161 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163 const string& _name, const string& _cookie, uint32_t _duration_secs);
164 };
165
166 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167 RGWRados *store;
168 rgw_raw_obj obj;
169 string lock_name;
170 string cookie;
171
172 protected:
173 int _send_request() override;
174 public:
175 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177 const string& _name, const string& _cookie);
178 };
179
180 template <class T>
181 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
182 RGWAsyncRadosProcessor *async_rados;
183 RGWRados *store;
184 RGWObjectCtx obj_ctx;
185 bufferlist bl;
186
187 rgw_raw_obj obj;
188
189 map<string, bufferlist> *pattrs{nullptr};
190
191 T *result;
192 /// on ENOENT, call handle_data() with an empty object instead of failing
193 const bool empty_on_enoent;
194 RGWObjVersionTracker *objv_tracker;
195
196 RGWAsyncGetSystemObj *req{nullptr};
197
198 public:
199 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
200 const rgw_raw_obj& _obj,
201 T *_result, bool empty_on_enoent = true,
202 RGWObjVersionTracker *objv_tracker = nullptr)
203 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
204 obj_ctx(store), obj(_obj), result(_result),
205 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
206 ~RGWSimpleRadosReadCR() override {
207 request_cleanup();
208 }
209
210 void request_cleanup() override {
211 if (req) {
212 req->finish();
213 req = NULL;
214 }
215 }
216
217 int send_request() override;
218 int request_complete() override;
219
220 virtual int handle_data(T& data) {
221 return 0;
222 }
223 };
224
225 template <class T>
226 int RGWSimpleRadosReadCR<T>::send_request()
227 {
228 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
229 store, &obj_ctx, objv_tracker,
230 obj,
231 &bl, 0, -1);
232 if (pattrs) {
233 req->set_read_attrs(pattrs);
234 }
235 async_rados->queue(req);
236 return 0;
237 }
238
239 template <class T>
240 int RGWSimpleRadosReadCR<T>::request_complete()
241 {
242 int ret = req->get_ret_status();
243 retcode = ret;
244 if (ret == -ENOENT && empty_on_enoent) {
245 *result = T();
246 } else {
247 if (ret < 0) {
248 return ret;
249 }
250 try {
251 bufferlist::iterator iter = bl.begin();
252 if (iter.end()) {
253 // allow successful reads with empty buffers. ReadSyncStatus coroutines
254 // depend on this to be able to read without locking, because the
255 // cls lock from InitSyncStatus will create an empty object if it didnt
256 // exist
257 *result = T();
258 } else {
259 ::decode(*result, iter);
260 }
261 } catch (buffer::error& err) {
262 return -EIO;
263 }
264 }
265
266 return handle_data(*result);
267 }
268
269 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
270 RGWAsyncRadosProcessor *async_rados;
271 RGWRados *store;
272 RGWObjectCtx obj_ctx;
273 bufferlist bl;
274
275 rgw_raw_obj obj;
276
277 map<string, bufferlist> *pattrs;
278
279 RGWAsyncGetSystemObj *req;
280
281 public:
282 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
283 const rgw_raw_obj& _obj,
284 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
285 async_rados(_async_rados), store(_store),
286 obj_ctx(store),
287 obj(_obj),
288 pattrs(_pattrs),
289 req(NULL) { }
290 ~RGWSimpleRadosReadAttrsCR() override {
291 request_cleanup();
292 }
293
294 void request_cleanup() override {
295 if (req) {
296 req->finish();
297 req = NULL;
298 }
299 }
300
301 int send_request() override;
302 int request_complete() override;
303 };
304
305 template <class T>
306 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
307 RGWAsyncRadosProcessor *async_rados;
308 RGWRados *store;
309 bufferlist bl;
310
311 rgw_raw_obj obj;
312 RGWObjVersionTracker *objv_tracker;
313
314 RGWAsyncPutSystemObj *req{nullptr};
315
316 public:
317 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
318 const rgw_raw_obj& _obj,
319 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
320 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
321 store(_store), obj(_obj), objv_tracker(objv_tracker) {
322 ::encode(_data, bl);
323 }
324
325 ~RGWSimpleRadosWriteCR() override {
326 request_cleanup();
327 }
328
329 void request_cleanup() override {
330 if (req) {
331 req->finish();
332 req = NULL;
333 }
334 }
335
336 int send_request() override {
337 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
338 store, objv_tracker, obj, false, bl);
339 async_rados->queue(req);
340 return 0;
341 }
342
343 int request_complete() override {
344 return req->get_ret_status();
345 }
346 };
347
348 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
349 RGWAsyncRadosProcessor *async_rados;
350 RGWRados *store;
351
352 rgw_raw_obj obj;
353
354 map<string, bufferlist> attrs;
355
356 RGWAsyncPutSystemObjAttrs *req;
357
358 public:
359 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
360 const rgw_raw_obj& _obj,
361 map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
362 async_rados(_async_rados),
363 store(_store),
364 obj(_obj),
365 attrs(_attrs), req(NULL) {
366 }
367 ~RGWSimpleRadosWriteAttrsCR() override {
368 request_cleanup();
369 }
370
371 void request_cleanup() override {
372 if (req) {
373 req->finish();
374 req = NULL;
375 }
376 }
377
378 int send_request() override {
379 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
380 store, NULL, obj, &attrs);
381 async_rados->queue(req);
382 return 0;
383 }
384
385 int request_complete() override {
386 return req->get_ret_status();
387 }
388 };
389
390 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
391 RGWRados *store;
392 map<string, bufferlist> entries;
393
394 rgw_rados_ref ref;
395
396 rgw_raw_obj obj;
397
398 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
399
400 public:
401 RGWRadosSetOmapKeysCR(RGWRados *_store,
402 const rgw_raw_obj& _obj,
403 map<string, bufferlist>& _entries);
404
405 int send_request() override;
406 int request_complete() override;
407 };
408
409 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
410 RGWRados *store;
411
412 string marker;
413 map<string, bufferlist> *entries;
414 int max_entries;
415
416 int rval;
417 rgw_rados_ref ref;
418
419 rgw_raw_obj obj;
420
421 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
422
423 public:
424 RGWRadosGetOmapKeysCR(RGWRados *_store,
425 const rgw_raw_obj& _obj,
426 const string& _marker,
427 map<string, bufferlist> *_entries, int _max_entries);
428
429 int send_request() override;
430
431 int request_complete() override {
432 return rval;
433 }
434 };
435
436 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
437 RGWRados *store;
438
439 rgw_rados_ref ref;
440
441 set<string> keys;
442
443 rgw_raw_obj obj;
444
445 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
446
447 public:
448 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
449 const rgw_raw_obj& _obj,
450 const set<string>& _keys);
451
452 int send_request() override;
453
454 int request_complete() override;
455 };
456
457 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
458 RGWRados *store;
459 librados::IoCtx ioctx;
460 const rgw_raw_obj obj;
461 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
462
463 public:
464 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
465
466 int send_request();
467 int request_complete();
468 };
469
470 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
471 RGWAsyncRadosProcessor *async_rados;
472 RGWRados *store;
473 string lock_name;
474 string cookie;
475 uint32_t duration;
476
477 rgw_raw_obj obj;
478
479 RGWAsyncLockSystemObj *req;
480
481 public:
482 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
483 const rgw_raw_obj& _obj,
484 const string& _lock_name,
485 const string& _cookie,
486 uint32_t _duration);
487 ~RGWSimpleRadosLockCR() override {
488 request_cleanup();
489 }
490 void request_cleanup() override;
491
492 int send_request() override;
493 int request_complete() override;
494
495 static std::string gen_random_cookie(CephContext* cct) {
496 #define COOKIE_LEN 16
497 char buf[COOKIE_LEN + 1];
498 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
499 return buf;
500 }
501 };
502
503 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
504 RGWAsyncRadosProcessor *async_rados;
505 RGWRados *store;
506 string lock_name;
507 string cookie;
508
509 rgw_raw_obj obj;
510
511 RGWAsyncUnlockSystemObj *req;
512
513 public:
514 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
515 const rgw_raw_obj& _obj,
516 const string& _lock_name,
517 const string& _cookie);
518 ~RGWSimpleRadosUnlockCR() override {
519 request_cleanup();
520 }
521 void request_cleanup() override;
522
523 int send_request() override;
524 int request_complete() override;
525 };
526
527 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
528
529 class RGWOmapAppend : public RGWConsumerCR<string> {
530 RGWAsyncRadosProcessor *async_rados;
531 RGWRados *store;
532
533 rgw_raw_obj obj;
534
535 bool going_down;
536
537 int num_pending_entries;
538 list<string> pending_entries;
539
540 map<string, bufferlist> entries;
541
542 uint64_t window_size;
543 uint64_t total_entries;
544 public:
545 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
546 const rgw_raw_obj& _obj,
547 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
548 int operate() override;
549 void flush_pending();
550 bool append(const string& s);
551 bool finish();
552
553 uint64_t get_total_entries() {
554 return total_entries;
555 }
556
557 const rgw_raw_obj& get_obj() {
558 return obj;
559 }
560 };
561
562 class RGWAsyncWait : public RGWAsyncRadosRequest {
563 CephContext *cct;
564 Mutex *lock;
565 Cond *cond;
566 utime_t interval;
567 protected:
568 int _send_request() override {
569 Mutex::Locker l(*lock);
570 return cond->WaitInterval(*lock, interval);
571 }
572 public:
573 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
574 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
575 cct(_cct),
576 lock(_lock), cond(_cond), interval(_secs, 0) {}
577
578 void wakeup() {
579 Mutex::Locker l(*lock);
580 cond->Signal();
581 }
582 };
583
584 class RGWWaitCR : public RGWSimpleCoroutine {
585 CephContext *cct;
586 RGWAsyncRadosProcessor *async_rados;
587 Mutex *lock;
588 Cond *cond;
589 int secs;
590
591 RGWAsyncWait *req;
592
593 public:
594 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
595 Mutex *_lock, Cond *_cond,
596 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
597 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
598 }
599 ~RGWWaitCR() override {
600 request_cleanup();
601 }
602
603 void request_cleanup() override {
604 if (req) {
605 wakeup();
606 req->finish();
607 req = NULL;
608 }
609 }
610
611 int send_request() override {
612 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
613 async_rados->queue(req);
614 return 0;
615 }
616
617 int request_complete() override {
618 return req->get_ret_status();
619 }
620
621 void wakeup() {
622 req->wakeup();
623 }
624 };
625
626 class RGWShardedOmapCRManager {
627 RGWAsyncRadosProcessor *async_rados;
628 RGWRados *store;
629 RGWCoroutine *op;
630
631 int num_shards;
632
633 vector<RGWOmapAppend *> shards;
634 public:
635 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
636 : async_rados(_async_rados),
637 store(_store), op(_op), num_shards(_num_shards) {
638 shards.reserve(num_shards);
639 for (int i = 0; i < num_shards; ++i) {
640 char buf[oid_prefix.size() + 16];
641 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
642 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
643 shard->get();
644 shards.push_back(shard);
645 op->spawn(shard, false);
646 }
647 }
648
649 ~RGWShardedOmapCRManager() {
650 for (auto shard : shards) {
651 shard->put();
652 }
653 }
654
655 bool append(const string& entry, int shard_id) {
656 return shards[shard_id]->append(entry);
657 }
658 bool finish() {
659 bool success = true;
660 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
661 success &= ((*iter)->finish() && (!(*iter)->is_error()));
662 }
663 return success;
664 }
665
666 uint64_t get_total_entries(int shard_id) {
667 return shards[shard_id]->get_total_entries();
668 }
669 };
670
671 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
672 RGWRados *store;
673 const std::string oid;
674 RGWBucketInfo *bucket_info;
675
676 protected:
677 int _send_request() override;
678 public:
679 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
680 RGWRados *_store, const std::string& oid,
681 RGWBucketInfo *_bucket_info)
682 : RGWAsyncRadosRequest(caller, cn), store(_store),
683 oid(oid), bucket_info(_bucket_info) {}
684 };
685
686 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
687 RGWAsyncRadosProcessor *async_rados;
688 RGWRados *store;
689 const std::string oid;
690 RGWBucketInfo *bucket_info;
691
692 RGWAsyncGetBucketInstanceInfo *req{nullptr};
693
694 public:
695 // metadata key constructor
696 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
697 const std::string& meta_key, RGWBucketInfo *_bucket_info)
698 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
699 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
700 bucket_info(_bucket_info) {}
701 // rgw_bucket constructor
702 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
703 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
704 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
705 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
706 bucket_info(_bucket_info) {}
707 ~RGWGetBucketInstanceInfoCR() override {
708 request_cleanup();
709 }
710 void request_cleanup() override {
711 if (req) {
712 req->finish();
713 req = NULL;
714 }
715 }
716
717 int send_request() override {
718 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid, bucket_info);
719 async_rados->queue(req);
720 return 0;
721 }
722 int request_complete() override {
723 return req->get_ret_status();
724 }
725 };
726
727 class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
728 RGWRados::BucketShard bs;
729 std::string start_marker;
730 std::string end_marker;
731 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
732 public:
733 RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
734 int shard_id, const std::string& start_marker,
735 const std::string& end_marker);
736
737 int send_request() override;
738 int request_complete() override;
739 };
740
741 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
742 RGWRados *store;
743 string source_zone;
744
745 RGWBucketInfo bucket_info;
746
747 rgw_obj_key key;
748 uint64_t versioned_epoch;
749
750 real_time src_mtime;
751
752 bool copy_if_newer;
753 rgw_zone_set *zones_trace;
754
755 protected:
756 int _send_request() override;
757 public:
758 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
759 const string& _source_zone,
760 RGWBucketInfo& _bucket_info,
761 const rgw_obj_key& _key,
762 uint64_t _versioned_epoch,
763 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
764 source_zone(_source_zone),
765 bucket_info(_bucket_info),
766 key(_key),
767 versioned_epoch(_versioned_epoch),
768 copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
769 };
770
771 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
772 CephContext *cct;
773 RGWAsyncRadosProcessor *async_rados;
774 RGWRados *store;
775 string source_zone;
776
777 RGWBucketInfo bucket_info;
778
779 rgw_obj_key key;
780 uint64_t versioned_epoch;
781
782 real_time src_mtime;
783
784 bool copy_if_newer;
785
786 RGWAsyncFetchRemoteObj *req;
787 rgw_zone_set *zones_trace;
788
789 public:
790 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
791 const string& _source_zone,
792 RGWBucketInfo& _bucket_info,
793 const rgw_obj_key& _key,
794 uint64_t _versioned_epoch,
795 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
796 async_rados(_async_rados), store(_store),
797 source_zone(_source_zone),
798 bucket_info(_bucket_info),
799 key(_key),
800 versioned_epoch(_versioned_epoch),
801 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
802
803
804 ~RGWFetchRemoteObjCR() override {
805 request_cleanup();
806 }
807
808 void request_cleanup() override {
809 if (req) {
810 req->finish();
811 req = NULL;
812 }
813 }
814
815 int send_request() override {
816 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
817 key, versioned_epoch, copy_if_newer, zones_trace);
818 async_rados->queue(req);
819 return 0;
820 }
821
822 int request_complete() override {
823 return req->get_ret_status();
824 }
825 };
826
827 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
828 RGWRados *store;
829 string source_zone;
830
831 RGWBucketInfo bucket_info;
832
833 rgw_obj_key key;
834
835 ceph::real_time *pmtime;
836 uint64_t *psize;
837 map<string, bufferlist> *pattrs;
838
839 protected:
840 int _send_request() override;
841 public:
842 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
843 const string& _source_zone,
844 RGWBucketInfo& _bucket_info,
845 const rgw_obj_key& _key,
846 ceph::real_time *_pmtime,
847 uint64_t *_psize,
848 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
849 source_zone(_source_zone),
850 bucket_info(_bucket_info),
851 key(_key),
852 pmtime(_pmtime),
853 psize(_psize),
854 pattrs(_pattrs) {}
855 };
856
857 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
858 CephContext *cct;
859 RGWAsyncRadosProcessor *async_rados;
860 RGWRados *store;
861 string source_zone;
862
863 RGWBucketInfo bucket_info;
864
865 rgw_obj_key key;
866
867 ceph::real_time *pmtime;
868 uint64_t *psize;
869 map<string, bufferlist> *pattrs;
870
871 RGWAsyncStatRemoteObj *req;
872
873 public:
874 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
875 const string& _source_zone,
876 RGWBucketInfo& _bucket_info,
877 const rgw_obj_key& _key,
878 ceph::real_time *_pmtime,
879 uint64_t *_psize,
880 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
881 async_rados(_async_rados), store(_store),
882 source_zone(_source_zone),
883 bucket_info(_bucket_info),
884 key(_key),
885 pmtime(_pmtime),
886 psize(_psize),
887 pattrs(_pattrs),
888 req(NULL) {}
889
890
891 ~RGWStatRemoteObjCR() override {
892 request_cleanup();
893 }
894
895 void request_cleanup() override {
896 if (req) {
897 req->finish();
898 req = NULL;
899 }
900 }
901
902 int send_request() override {
903 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
904 bucket_info, key, pmtime, psize, pattrs);
905 async_rados->queue(req);
906 return 0;
907 }
908
909 int request_complete() override {
910 return req->get_ret_status();
911 }
912 };
913
914 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
915 RGWRados *store;
916 string source_zone;
917
918 RGWBucketInfo bucket_info;
919
920 rgw_obj_key key;
921 string owner;
922 string owner_display_name;
923 bool versioned;
924 uint64_t versioned_epoch;
925 string marker_version_id;
926
927 bool del_if_older;
928 ceph::real_time timestamp;
929 rgw_zone_set *zones_trace;
930
931 protected:
932 int _send_request() override;
933 public:
934 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
935 const string& _source_zone,
936 RGWBucketInfo& _bucket_info,
937 const rgw_obj_key& _key,
938 const string& _owner,
939 const string& _owner_display_name,
940 bool _versioned,
941 uint64_t _versioned_epoch,
942 bool _delete_marker,
943 bool _if_older,
944 real_time& _timestamp,
945 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
946 source_zone(_source_zone),
947 bucket_info(_bucket_info),
948 key(_key),
949 owner(_owner),
950 owner_display_name(_owner_display_name),
951 versioned(_versioned),
952 versioned_epoch(_versioned_epoch),
953 del_if_older(_if_older),
954 timestamp(_timestamp), zones_trace(_zones_trace) {
955 if (_delete_marker) {
956 marker_version_id = key.instance;
957 }
958 }
959 };
960
961 class RGWRemoveObjCR : public RGWSimpleCoroutine {
962 CephContext *cct;
963 RGWAsyncRadosProcessor *async_rados;
964 RGWRados *store;
965 string source_zone;
966
967 RGWBucketInfo bucket_info;
968
969 rgw_obj_key key;
970 bool versioned;
971 uint64_t versioned_epoch;
972 bool delete_marker;
973 string owner;
974 string owner_display_name;
975
976 bool del_if_older;
977 real_time timestamp;
978
979 RGWAsyncRemoveObj *req;
980
981 rgw_zone_set *zones_trace;
982
983 public:
984 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
985 const string& _source_zone,
986 RGWBucketInfo& _bucket_info,
987 const rgw_obj_key& _key,
988 bool _versioned,
989 uint64_t _versioned_epoch,
990 string *_owner,
991 string *_owner_display_name,
992 bool _delete_marker,
993 real_time *_timestamp,
994 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
995 async_rados(_async_rados), store(_store),
996 source_zone(_source_zone),
997 bucket_info(_bucket_info),
998 key(_key),
999 versioned(_versioned),
1000 versioned_epoch(_versioned_epoch),
1001 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
1002 del_if_older = (_timestamp != NULL);
1003 if (_timestamp) {
1004 timestamp = *_timestamp;
1005 }
1006
1007 if (_owner) {
1008 owner = *_owner;
1009 }
1010
1011 if (_owner_display_name) {
1012 owner_display_name = *_owner_display_name;
1013 }
1014 }
1015 ~RGWRemoveObjCR() override {
1016 request_cleanup();
1017 }
1018
1019 void request_cleanup() override {
1020 if (req) {
1021 req->finish();
1022 req = NULL;
1023 }
1024 }
1025
1026 int send_request() override {
1027 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1028 key, owner, owner_display_name, versioned, versioned_epoch,
1029 delete_marker, del_if_older, timestamp, zones_trace);
1030 async_rados->queue(req);
1031 return 0;
1032 }
1033
1034 int request_complete() override {
1035 return req->get_ret_status();
1036 }
1037 };
1038
1039 class RGWContinuousLeaseCR : public RGWCoroutine {
1040 RGWAsyncRadosProcessor *async_rados;
1041 RGWRados *store;
1042
1043 const rgw_raw_obj obj;
1044
1045 const string lock_name;
1046 const string cookie;
1047
1048 int interval;
1049
1050 Mutex lock;
1051 std::atomic<bool> going_down = { false };
1052 bool locked{false};
1053
1054 RGWCoroutine *caller;
1055
1056 bool aborted{false};
1057
1058 public:
1059 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1060 const rgw_raw_obj& _obj,
1061 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1062 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1063 obj(_obj), lock_name(_lock_name),
1064 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1065 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1066 {}
1067
1068 int operate() override;
1069
1070 bool is_locked() {
1071 Mutex::Locker l(lock);
1072 return locked;
1073 }
1074
1075 void set_locked(bool status) {
1076 Mutex::Locker l(lock);
1077 locked = status;
1078 }
1079
1080 void go_down() {
1081 going_down = true;
1082 wakeup();
1083 }
1084
1085 void abort() {
1086 aborted = true;
1087 }
1088 };
1089
1090 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1091 RGWRados *store;
1092 list<cls_log_entry> entries;
1093
1094 string oid;
1095
1096 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1097
1098 public:
1099 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1100 const cls_log_entry& entry);
1101
1102 int send_request() override;
1103 int request_complete() override;
1104 };
1105
1106 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1107 RGWRados *store;
1108 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1109 protected:
1110 std::string oid;
1111 real_time start_time;
1112 real_time end_time;
1113 std::string from_marker;
1114 std::string to_marker;
1115
1116 public:
1117 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1118 const real_time& start_time, const real_time& end_time,
1119 const std::string& from_marker,
1120 const std::string& to_marker);
1121
1122 int send_request() override;
1123 int request_complete() override;
1124 };
1125
1126 // wrapper to update last_trim_marker on success
1127 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1128 CephContext *cct;
1129 std::string *last_trim_marker;
1130 public:
1131 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1132 const std::string& to_marker, std::string *last_trim_marker);
1133 int request_complete() override;
1134 };
1135
1136 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1137 RGWRados *store;
1138 RGWBucketInfo bucket_info;
1139 rgw_obj obj;
1140 uint64_t *psize;
1141 real_time *pmtime;
1142 uint64_t *pepoch;
1143 RGWObjVersionTracker *objv_tracker;
1144 protected:
1145 int _send_request() override;
1146 public:
1147 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1148 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1149 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1150 RGWObjVersionTracker *objv_tracker = nullptr)
1151 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1152 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1153 };
1154
1155 class RGWStatObjCR : public RGWSimpleCoroutine {
1156 RGWRados *store;
1157 RGWAsyncRadosProcessor *async_rados;
1158 RGWBucketInfo bucket_info;
1159 rgw_obj obj;
1160 uint64_t *psize;
1161 real_time *pmtime;
1162 uint64_t *pepoch;
1163 RGWObjVersionTracker *objv_tracker;
1164 RGWAsyncStatObj *req = nullptr;
1165 public:
1166 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1167 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1168 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1169 RGWObjVersionTracker *objv_tracker = nullptr);
1170 ~RGWStatObjCR() override {
1171 request_cleanup();
1172 }
1173 void request_cleanup() override;
1174
1175 int send_request() override;
1176 int request_complete() override;
1177 };
1178
1179 /// coroutine wrapper for IoCtx::aio_notify()
1180 class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1181 RGWRados *const store;
1182 const rgw_raw_obj obj;
1183 bufferlist request;
1184 const uint64_t timeout_ms;
1185 bufferlist *response;
1186 rgw_rados_ref ref;
1187 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1188
1189 public:
1190 RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
1191 bufferlist& request, uint64_t timeout_ms,
1192 bufferlist *response);
1193
1194 int send_request() override;
1195 int request_complete() override;
1196 };
1197
1198 #endif