]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
3
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_rados.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
10
11 #include <atomic>
12
13 class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21 protected:
22 virtual int _send_request() = 0;
23 public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59 };
60
61
62 class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65 protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89 public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100 };
101
102
103 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx *obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker *objv_tracker;
108 rgw_raw_obj obj;
109 bufferlist *pbl;
110 map<string, bufferlist> *pattrs;
111 off_t ofs;
112 off_t end;
113 protected:
114 int _send_request() override;
115 public:
116 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118 bufferlist *_pbl, off_t _ofs, off_t _end);
119 void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120 };
121
122 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123 RGWRados *store;
124 RGWObjVersionTracker *objv_tracker;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129 protected:
130 int _send_request() override;
131 public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist& _bl);
135 };
136
137 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138 RGWRados *store;
139 RGWObjVersionTracker *objv_tracker;
140 rgw_raw_obj obj;
141 map<string, bufferlist> *attrs;
142
143 protected:
144 int _send_request() override;
145 public:
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148 map<string, bufferlist> *_attrs);
149 };
150
151 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152 RGWRados *store;
153 rgw_raw_obj obj;
154 string lock_name;
155 string cookie;
156 uint32_t duration_secs;
157
158 protected:
159 int _send_request() override;
160 public:
161 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163 const string& _name, const string& _cookie, uint32_t _duration_secs);
164 };
165
166 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167 RGWRados *store;
168 rgw_raw_obj obj;
169 string lock_name;
170 string cookie;
171
172 protected:
173 int _send_request() override;
174 public:
175 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177 const string& _name, const string& _cookie);
178 };
179
180
181 template <class T>
182 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
183 RGWAsyncRadosProcessor *async_rados;
184 RGWRados *store;
185 RGWObjectCtx obj_ctx;
186 bufferlist bl;
187
188 rgw_raw_obj obj;
189
190 map<string, bufferlist> *pattrs{nullptr};
191
192 T *result;
193 /// on ENOENT, call handle_data() with an empty object instead of failing
194 const bool empty_on_enoent;
195 RGWObjVersionTracker *objv_tracker;
196
197 RGWAsyncGetSystemObj *req{nullptr};
198
199 public:
200 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
201 const rgw_raw_obj& _obj,
202 T *_result, bool empty_on_enoent = true,
203 RGWObjVersionTracker *objv_tracker = nullptr)
204 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
205 obj_ctx(store), obj(_obj), result(_result),
206 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
207 ~RGWSimpleRadosReadCR() override {
208 request_cleanup();
209 }
210
211 void request_cleanup() override {
212 if (req) {
213 req->finish();
214 req = NULL;
215 }
216 }
217
218 int send_request() override;
219 int request_complete() override;
220
221 virtual int handle_data(T& data) {
222 return 0;
223 }
224 };
225
226 template <class T>
227 int RGWSimpleRadosReadCR<T>::send_request()
228 {
229 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
230 store, &obj_ctx, objv_tracker,
231 obj,
232 &bl, 0, -1);
233 if (pattrs) {
234 req->set_read_attrs(pattrs);
235 }
236 async_rados->queue(req);
237 return 0;
238 }
239
240 template <class T>
241 int RGWSimpleRadosReadCR<T>::request_complete()
242 {
243 int ret = req->get_ret_status();
244 retcode = ret;
245 if (ret == -ENOENT && empty_on_enoent) {
246 *result = T();
247 } else {
248 if (ret < 0) {
249 return ret;
250 }
251 try {
252 bufferlist::iterator iter = bl.begin();
253 if (iter.end()) {
254 // allow successful reads with empty buffers. ReadSyncStatus coroutines
255 // depend on this to be able to read without locking, because the
256 // cls lock from InitSyncStatus will create an empty object if it didnt
257 // exist
258 *result = T();
259 } else {
260 ::decode(*result, iter);
261 }
262 } catch (buffer::error& err) {
263 return -EIO;
264 }
265 }
266
267 return handle_data(*result);
268 }
269
270 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
271 RGWAsyncRadosProcessor *async_rados;
272 RGWRados *store;
273 RGWObjectCtx obj_ctx;
274 bufferlist bl;
275
276 rgw_raw_obj obj;
277
278 map<string, bufferlist> *pattrs;
279
280 RGWAsyncGetSystemObj *req;
281
282 public:
283 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
284 const rgw_raw_obj& _obj,
285 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
286 async_rados(_async_rados), store(_store),
287 obj_ctx(store),
288 obj(_obj),
289 pattrs(_pattrs),
290 req(NULL) { }
291 ~RGWSimpleRadosReadAttrsCR() override {
292 request_cleanup();
293 }
294
295 void request_cleanup() override {
296 if (req) {
297 req->finish();
298 req = NULL;
299 }
300 }
301
302 int send_request() override;
303 int request_complete() override;
304 };
305
306 template <class T>
307 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
308 RGWAsyncRadosProcessor *async_rados;
309 RGWRados *store;
310 bufferlist bl;
311
312 rgw_raw_obj obj;
313 RGWObjVersionTracker *objv_tracker;
314
315 RGWAsyncPutSystemObj *req{nullptr};
316
317 public:
318 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
319 const rgw_raw_obj& _obj,
320 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
321 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
322 store(_store), obj(_obj), objv_tracker(objv_tracker) {
323 ::encode(_data, bl);
324 }
325
326 ~RGWSimpleRadosWriteCR() override {
327 request_cleanup();
328 }
329
330 void request_cleanup() override {
331 if (req) {
332 req->finish();
333 req = NULL;
334 }
335 }
336
337 int send_request() override {
338 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
339 store, objv_tracker, obj, false, bl);
340 async_rados->queue(req);
341 return 0;
342 }
343
344 int request_complete() override {
345 return req->get_ret_status();
346 }
347 };
348
349 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
350 RGWAsyncRadosProcessor *async_rados;
351 RGWRados *store;
352
353 rgw_raw_obj obj;
354
355 map<string, bufferlist> attrs;
356
357 RGWAsyncPutSystemObjAttrs *req;
358
359 public:
360 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
361 const rgw_raw_obj& _obj,
362 map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
363 async_rados(_async_rados),
364 store(_store),
365 obj(_obj),
366 attrs(_attrs), req(NULL) {
367 }
368 ~RGWSimpleRadosWriteAttrsCR() override {
369 request_cleanup();
370 }
371
372 void request_cleanup() override {
373 if (req) {
374 req->finish();
375 req = NULL;
376 }
377 }
378
379 int send_request() override {
380 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
381 store, NULL, obj, &attrs);
382 async_rados->queue(req);
383 return 0;
384 }
385
386 int request_complete() override {
387 return req->get_ret_status();
388 }
389 };
390
391 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
392 RGWRados *store;
393 map<string, bufferlist> entries;
394
395 rgw_rados_ref ref;
396
397 rgw_raw_obj obj;
398
399 RGWAioCompletionNotifier *cn;
400
401 public:
402 RGWRadosSetOmapKeysCR(RGWRados *_store,
403 const rgw_raw_obj& _obj,
404 map<string, bufferlist>& _entries);
405
406 ~RGWRadosSetOmapKeysCR() override;
407
408 int send_request() override;
409 int request_complete() override;
410 };
411
412 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
413 RGWRados *store;
414
415 string marker;
416 map<string, bufferlist> *entries;
417 int max_entries;
418
419 int rval;
420 rgw_rados_ref ref;
421
422 rgw_raw_obj obj;
423
424 RGWAioCompletionNotifier *cn;
425
426 public:
427 RGWRadosGetOmapKeysCR(RGWRados *_store,
428 const rgw_raw_obj& _obj,
429 const string& _marker,
430 map<string, bufferlist> *_entries, int _max_entries);
431
432 ~RGWRadosGetOmapKeysCR() override;
433
434 int send_request() override;
435
436 int request_complete() override {
437 return rval;
438 }
439 };
440
441 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
442 RGWRados *store;
443
444 string marker;
445 map<string, bufferlist> *entries;
446 int max_entries;
447
448 int rval;
449 rgw_rados_ref ref;
450
451 set<string> keys;
452
453 rgw_raw_obj obj;
454
455 RGWAioCompletionNotifier *cn;
456
457 public:
458 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
459 const rgw_raw_obj& _obj,
460 const set<string>& _keys);
461
462 ~RGWRadosRemoveOmapKeysCR() override;
463
464 int send_request() override;
465
466 int request_complete() override {
467 return rval;
468 }
469 };
470
471 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
472 RGWRados *store;
473 librados::IoCtx ioctx;
474 const rgw_raw_obj obj;
475 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
476
477 public:
478 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
479
480 int send_request();
481 int request_complete();
482 };
483
484 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
485 RGWAsyncRadosProcessor *async_rados;
486 RGWRados *store;
487 string lock_name;
488 string cookie;
489 uint32_t duration;
490
491 rgw_raw_obj obj;
492
493 RGWAsyncLockSystemObj *req;
494
495 public:
496 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
497 const rgw_raw_obj& _obj,
498 const string& _lock_name,
499 const string& _cookie,
500 uint32_t _duration);
501 ~RGWSimpleRadosLockCR() override {
502 request_cleanup();
503 }
504 void request_cleanup() override;
505
506 int send_request() override;
507 int request_complete() override;
508
509 static std::string gen_random_cookie(CephContext* cct) {
510 #define COOKIE_LEN 16
511 char buf[COOKIE_LEN + 1];
512 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
513 return buf;
514 }
515 };
516
517 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
518 RGWAsyncRadosProcessor *async_rados;
519 RGWRados *store;
520 string lock_name;
521 string cookie;
522
523 rgw_raw_obj obj;
524
525 RGWAsyncUnlockSystemObj *req;
526
527 public:
528 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
529 const rgw_raw_obj& _obj,
530 const string& _lock_name,
531 const string& _cookie);
532 ~RGWSimpleRadosUnlockCR() override {
533 request_cleanup();
534 }
535 void request_cleanup() override;
536
537 int send_request() override;
538 int request_complete() override;
539 };
540
541 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
542
543 class RGWOmapAppend : public RGWConsumerCR<string> {
544 RGWAsyncRadosProcessor *async_rados;
545 RGWRados *store;
546
547 rgw_raw_obj obj;
548
549 bool going_down;
550
551 int num_pending_entries;
552 list<string> pending_entries;
553
554 map<string, bufferlist> entries;
555
556 uint64_t window_size;
557 uint64_t total_entries;
558 public:
559 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
560 const rgw_raw_obj& _obj,
561 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
562 int operate() override;
563 void flush_pending();
564 bool append(const string& s);
565 bool finish();
566
567 uint64_t get_total_entries() {
568 return total_entries;
569 }
570
571 const rgw_raw_obj& get_obj() {
572 return obj;
573 }
574 };
575
576 class RGWAsyncWait : public RGWAsyncRadosRequest {
577 CephContext *cct;
578 Mutex *lock;
579 Cond *cond;
580 utime_t interval;
581 protected:
582 int _send_request() override {
583 Mutex::Locker l(*lock);
584 return cond->WaitInterval(*lock, interval);
585 }
586 public:
587 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
588 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
589 cct(_cct),
590 lock(_lock), cond(_cond), interval(_secs, 0) {}
591
592 void wakeup() {
593 Mutex::Locker l(*lock);
594 cond->Signal();
595 }
596 };
597
598 class RGWWaitCR : public RGWSimpleCoroutine {
599 CephContext *cct;
600 RGWAsyncRadosProcessor *async_rados;
601 Mutex *lock;
602 Cond *cond;
603 int secs;
604
605 RGWAsyncWait *req;
606
607 public:
608 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
609 Mutex *_lock, Cond *_cond,
610 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
611 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
612 }
613 ~RGWWaitCR() override {
614 request_cleanup();
615 }
616
617 void request_cleanup() override {
618 if (req) {
619 wakeup();
620 req->finish();
621 req = NULL;
622 }
623 }
624
625 int send_request() override {
626 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
627 async_rados->queue(req);
628 return 0;
629 }
630
631 int request_complete() override {
632 return req->get_ret_status();
633 }
634
635 void wakeup() {
636 req->wakeup();
637 }
638 };
639
640 class RGWShardedOmapCRManager {
641 RGWAsyncRadosProcessor *async_rados;
642 RGWRados *store;
643 RGWCoroutine *op;
644
645 int num_shards;
646
647 vector<RGWOmapAppend *> shards;
648 public:
649 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
650 : async_rados(_async_rados),
651 store(_store), op(_op), num_shards(_num_shards) {
652 shards.reserve(num_shards);
653 for (int i = 0; i < num_shards; ++i) {
654 char buf[oid_prefix.size() + 16];
655 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
656 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
657 shard->get();
658 shards.push_back(shard);
659 op->spawn(shard, false);
660 }
661 }
662
663 ~RGWShardedOmapCRManager() {
664 for (auto shard : shards) {
665 shard->put();
666 }
667 }
668
669 bool append(const string& entry, int shard_id) {
670 return shards[shard_id]->append(entry);
671 }
672 bool finish() {
673 bool success = true;
674 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
675 success &= ((*iter)->finish() && (!(*iter)->is_error()));
676 }
677 return success;
678 }
679
680 uint64_t get_total_entries(int shard_id) {
681 return shards[shard_id]->get_total_entries();
682 }
683 };
684
685 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
686 RGWRados *store;
687 rgw_bucket bucket;
688 RGWBucketInfo *bucket_info;
689
690 protected:
691 int _send_request() override;
692 public:
693 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
694 RGWRados *_store, const rgw_bucket& bucket,
695 RGWBucketInfo *_bucket_info)
696 : RGWAsyncRadosRequest(caller, cn), store(_store),
697 bucket(bucket), bucket_info(_bucket_info) {}
698 };
699
700 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
701 RGWAsyncRadosProcessor *async_rados;
702 RGWRados *store;
703 rgw_bucket bucket;
704 RGWBucketInfo *bucket_info;
705
706 RGWAsyncGetBucketInstanceInfo *req;
707
708 public:
709 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
710 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
711 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
712 bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
713 ~RGWGetBucketInstanceInfoCR() override {
714 request_cleanup();
715 }
716 void request_cleanup() override {
717 if (req) {
718 req->finish();
719 req = NULL;
720 }
721 }
722
723 int send_request() override {
724 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
725 async_rados->queue(req);
726 return 0;
727 }
728 int request_complete() override {
729 return req->get_ret_status();
730 }
731 };
732
733 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
734 RGWRados *store;
735 string source_zone;
736
737 RGWBucketInfo bucket_info;
738
739 rgw_obj_key key;
740 uint64_t versioned_epoch;
741
742 real_time src_mtime;
743
744 bool copy_if_newer;
745
746 protected:
747 int _send_request() override;
748 public:
749 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
750 const string& _source_zone,
751 RGWBucketInfo& _bucket_info,
752 const rgw_obj_key& _key,
753 uint64_t _versioned_epoch,
754 bool _if_newer) : RGWAsyncRadosRequest(caller, cn), store(_store),
755 source_zone(_source_zone),
756 bucket_info(_bucket_info),
757 key(_key),
758 versioned_epoch(_versioned_epoch),
759 copy_if_newer(_if_newer) {}
760 };
761
762 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
763 CephContext *cct;
764 RGWAsyncRadosProcessor *async_rados;
765 RGWRados *store;
766 string source_zone;
767
768 RGWBucketInfo bucket_info;
769
770 rgw_obj_key key;
771 uint64_t versioned_epoch;
772
773 real_time src_mtime;
774
775 bool copy_if_newer;
776
777 RGWAsyncFetchRemoteObj *req;
778
779 public:
780 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
781 const string& _source_zone,
782 RGWBucketInfo& _bucket_info,
783 const rgw_obj_key& _key,
784 uint64_t _versioned_epoch,
785 bool _if_newer) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
786 async_rados(_async_rados), store(_store),
787 source_zone(_source_zone),
788 bucket_info(_bucket_info),
789 key(_key),
790 versioned_epoch(_versioned_epoch),
791 copy_if_newer(_if_newer), req(NULL) {}
792
793
794 ~RGWFetchRemoteObjCR() override {
795 request_cleanup();
796 }
797
798 void request_cleanup() override {
799 if (req) {
800 req->finish();
801 req = NULL;
802 }
803 }
804
805 int send_request() override {
806 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
807 key, versioned_epoch, copy_if_newer);
808 async_rados->queue(req);
809 return 0;
810 }
811
812 int request_complete() override {
813 return req->get_ret_status();
814 }
815 };
816
817 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
818 RGWRados *store;
819 string source_zone;
820
821 RGWBucketInfo bucket_info;
822
823 rgw_obj_key key;
824
825 ceph::real_time *pmtime;
826 uint64_t *psize;
827 map<string, bufferlist> *pattrs;
828
829 protected:
830 int _send_request() override;
831 public:
832 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
833 const string& _source_zone,
834 RGWBucketInfo& _bucket_info,
835 const rgw_obj_key& _key,
836 ceph::real_time *_pmtime,
837 uint64_t *_psize,
838 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
839 source_zone(_source_zone),
840 bucket_info(_bucket_info),
841 key(_key),
842 pmtime(_pmtime),
843 psize(_psize),
844 pattrs(_pattrs) {}
845 };
846
847 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
848 CephContext *cct;
849 RGWAsyncRadosProcessor *async_rados;
850 RGWRados *store;
851 string source_zone;
852
853 RGWBucketInfo bucket_info;
854
855 rgw_obj_key key;
856
857 ceph::real_time *pmtime;
858 uint64_t *psize;
859 map<string, bufferlist> *pattrs;
860
861 RGWAsyncStatRemoteObj *req;
862
863 public:
864 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
865 const string& _source_zone,
866 RGWBucketInfo& _bucket_info,
867 const rgw_obj_key& _key,
868 ceph::real_time *_pmtime,
869 uint64_t *_psize,
870 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
871 async_rados(_async_rados), store(_store),
872 source_zone(_source_zone),
873 bucket_info(_bucket_info),
874 key(_key),
875 pmtime(_pmtime),
876 psize(_psize),
877 pattrs(_pattrs),
878 req(NULL) {}
879
880
881 ~RGWStatRemoteObjCR() override {
882 request_cleanup();
883 }
884
885 void request_cleanup() override {
886 if (req) {
887 req->finish();
888 req = NULL;
889 }
890 }
891
892 int send_request() override {
893 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
894 bucket_info, key, pmtime, psize, pattrs);
895 async_rados->queue(req);
896 return 0;
897 }
898
899 int request_complete() override {
900 return req->get_ret_status();
901 }
902 };
903
904 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
905 RGWRados *store;
906 string source_zone;
907
908 RGWBucketInfo bucket_info;
909
910 rgw_obj_key key;
911 string owner;
912 string owner_display_name;
913 bool versioned;
914 uint64_t versioned_epoch;
915 string marker_version_id;
916
917 bool del_if_older;
918 ceph::real_time timestamp;
919
920 protected:
921 int _send_request() override;
922 public:
923 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
924 const string& _source_zone,
925 RGWBucketInfo& _bucket_info,
926 const rgw_obj_key& _key,
927 const string& _owner,
928 const string& _owner_display_name,
929 bool _versioned,
930 uint64_t _versioned_epoch,
931 bool _delete_marker,
932 bool _if_older,
933 real_time& _timestamp) : RGWAsyncRadosRequest(caller, cn), store(_store),
934 source_zone(_source_zone),
935 bucket_info(_bucket_info),
936 key(_key),
937 owner(_owner),
938 owner_display_name(_owner_display_name),
939 versioned(_versioned),
940 versioned_epoch(_versioned_epoch),
941 del_if_older(_if_older),
942 timestamp(_timestamp) {
943 if (_delete_marker) {
944 marker_version_id = key.instance;
945 }
946 }
947 };
948
949 class RGWRemoveObjCR : public RGWSimpleCoroutine {
950 CephContext *cct;
951 RGWAsyncRadosProcessor *async_rados;
952 RGWRados *store;
953 string source_zone;
954
955 RGWBucketInfo bucket_info;
956
957 rgw_obj_key key;
958 bool versioned;
959 uint64_t versioned_epoch;
960 bool delete_marker;
961 string owner;
962 string owner_display_name;
963
964 bool del_if_older;
965 real_time timestamp;
966
967 RGWAsyncRemoveObj *req;
968
969 public:
970 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
971 const string& _source_zone,
972 RGWBucketInfo& _bucket_info,
973 const rgw_obj_key& _key,
974 bool _versioned,
975 uint64_t _versioned_epoch,
976 string *_owner,
977 string *_owner_display_name,
978 bool _delete_marker,
979 real_time *_timestamp) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
980 async_rados(_async_rados), store(_store),
981 source_zone(_source_zone),
982 bucket_info(_bucket_info),
983 key(_key),
984 versioned(_versioned),
985 versioned_epoch(_versioned_epoch),
986 delete_marker(_delete_marker), req(NULL) {
987 del_if_older = (_timestamp != NULL);
988 if (_timestamp) {
989 timestamp = *_timestamp;
990 }
991
992 if (_owner) {
993 owner = *_owner;
994 }
995
996 if (_owner_display_name) {
997 owner_display_name = *_owner_display_name;
998 }
999 }
1000 ~RGWRemoveObjCR() override {
1001 request_cleanup();
1002 }
1003
1004 void request_cleanup() override {
1005 if (req) {
1006 req->finish();
1007 req = NULL;
1008 }
1009 }
1010
1011 int send_request() override {
1012 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1013 key, owner, owner_display_name, versioned, versioned_epoch,
1014 delete_marker, del_if_older, timestamp);
1015 async_rados->queue(req);
1016 return 0;
1017 }
1018
1019 int request_complete() override {
1020 return req->get_ret_status();
1021 }
1022 };
1023
1024 class RGWContinuousLeaseCR : public RGWCoroutine {
1025 RGWAsyncRadosProcessor *async_rados;
1026 RGWRados *store;
1027
1028 const rgw_raw_obj obj;
1029
1030 const string lock_name;
1031 const string cookie;
1032
1033 int interval;
1034
1035 Mutex lock;
1036 std::atomic<bool> going_down = { false };
1037 bool locked{false};
1038
1039 RGWCoroutine *caller;
1040
1041 bool aborted{false};
1042
1043 public:
1044 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1045 const rgw_raw_obj& _obj,
1046 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1047 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1048 obj(_obj), lock_name(_lock_name),
1049 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1050 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1051 {}
1052
1053 int operate() override;
1054
1055 bool is_locked() {
1056 Mutex::Locker l(lock);
1057 return locked;
1058 }
1059
1060 void set_locked(bool status) {
1061 Mutex::Locker l(lock);
1062 locked = status;
1063 }
1064
1065 void go_down() {
1066 going_down = true;
1067 wakeup();
1068 }
1069
1070 void abort() {
1071 aborted = true;
1072 }
1073 };
1074
1075 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1076 RGWRados *store;
1077 list<cls_log_entry> entries;
1078
1079 string oid;
1080
1081 RGWAioCompletionNotifier *cn;
1082
1083 public:
1084 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1085 const cls_log_entry& entry);
1086 ~RGWRadosTimelogAddCR() override;
1087
1088 int send_request() override;
1089 int request_complete() override;
1090 };
1091
1092 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1093 RGWRados *store;
1094 RGWAioCompletionNotifier *cn{nullptr};
1095 protected:
1096 std::string oid;
1097 real_time start_time;
1098 real_time end_time;
1099 std::string from_marker;
1100 std::string to_marker;
1101
1102 public:
1103 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1104 const real_time& start_time, const real_time& end_time,
1105 const std::string& from_marker,
1106 const std::string& to_marker);
1107 ~RGWRadosTimelogTrimCR() override;
1108
1109 int send_request() override;
1110 int request_complete() override;
1111 };
1112
1113 // wrapper to update last_trim_marker on success
1114 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1115 CephContext *cct;
1116 std::string *last_trim_marker;
1117 public:
1118 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1119 const std::string& to_marker, std::string *last_trim_marker);
1120 int request_complete() override;
1121 };
1122
1123 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1124 RGWRados *store;
1125 RGWBucketInfo bucket_info;
1126 rgw_obj obj;
1127 uint64_t *psize;
1128 real_time *pmtime;
1129 uint64_t *pepoch;
1130 RGWObjVersionTracker *objv_tracker;
1131 protected:
1132 int _send_request() override;
1133 public:
1134 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1135 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1136 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1137 RGWObjVersionTracker *objv_tracker = nullptr)
1138 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1139 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1140 };
1141
1142 class RGWStatObjCR : public RGWSimpleCoroutine {
1143 RGWRados *store;
1144 RGWAsyncRadosProcessor *async_rados;
1145 RGWBucketInfo bucket_info;
1146 rgw_obj obj;
1147 uint64_t *psize;
1148 real_time *pmtime;
1149 uint64_t *pepoch;
1150 RGWObjVersionTracker *objv_tracker;
1151 RGWAsyncStatObj *req = nullptr;
1152 public:
1153 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1154 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1155 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1156 RGWObjVersionTracker *objv_tracker = nullptr);
1157 ~RGWStatObjCR() override {
1158 request_cleanup();
1159 }
1160 void request_cleanup() override;
1161
1162 int send_request() override;
1163 int request_complete() override;
1164 };
1165
1166 #endif