]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
update sources to v12.1.1
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
3
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_rados.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
10
11 #include <atomic>
12
13 class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21 protected:
22 virtual int _send_request() = 0;
23 public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59 };
60
61
62 class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65 protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89 public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100 };
101
102
103 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx *obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker *objv_tracker;
108 rgw_raw_obj obj;
109 bufferlist *pbl;
110 map<string, bufferlist> *pattrs;
111 off_t ofs;
112 off_t end;
113 protected:
114 int _send_request() override;
115 public:
116 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118 bufferlist *_pbl, off_t _ofs, off_t _end);
119 void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120 };
121
122 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123 RGWRados *store;
124 RGWObjVersionTracker *objv_tracker;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129 protected:
130 int _send_request() override;
131 public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist& _bl);
135 };
136
137 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138 RGWRados *store;
139 RGWObjVersionTracker *objv_tracker;
140 rgw_raw_obj obj;
141 map<string, bufferlist> *attrs;
142
143 protected:
144 int _send_request() override;
145 public:
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148 map<string, bufferlist> *_attrs);
149 };
150
151 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152 RGWRados *store;
153 rgw_raw_obj obj;
154 string lock_name;
155 string cookie;
156 uint32_t duration_secs;
157
158 protected:
159 int _send_request() override;
160 public:
161 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163 const string& _name, const string& _cookie, uint32_t _duration_secs);
164 };
165
166 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167 RGWRados *store;
168 rgw_raw_obj obj;
169 string lock_name;
170 string cookie;
171
172 protected:
173 int _send_request() override;
174 public:
175 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177 const string& _name, const string& _cookie);
178 };
179
180
181 template <class T>
182 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
183 RGWAsyncRadosProcessor *async_rados;
184 RGWRados *store;
185 RGWObjectCtx obj_ctx;
186 bufferlist bl;
187
188 rgw_raw_obj obj;
189
190 map<string, bufferlist> *pattrs{nullptr};
191
192 T *result;
193 /// on ENOENT, call handle_data() with an empty object instead of failing
194 const bool empty_on_enoent;
195 RGWObjVersionTracker *objv_tracker;
196
197 RGWAsyncGetSystemObj *req{nullptr};
198
199 public:
200 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
201 const rgw_raw_obj& _obj,
202 T *_result, bool empty_on_enoent = true,
203 RGWObjVersionTracker *objv_tracker = nullptr)
204 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
205 obj_ctx(store), obj(_obj), result(_result),
206 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
207 ~RGWSimpleRadosReadCR() override {
208 request_cleanup();
209 }
210
211 void request_cleanup() override {
212 if (req) {
213 req->finish();
214 req = NULL;
215 }
216 }
217
218 int send_request() override;
219 int request_complete() override;
220
221 virtual int handle_data(T& data) {
222 return 0;
223 }
224 };
225
226 template <class T>
227 int RGWSimpleRadosReadCR<T>::send_request()
228 {
229 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
230 store, &obj_ctx, objv_tracker,
231 obj,
232 &bl, 0, -1);
233 if (pattrs) {
234 req->set_read_attrs(pattrs);
235 }
236 async_rados->queue(req);
237 return 0;
238 }
239
240 template <class T>
241 int RGWSimpleRadosReadCR<T>::request_complete()
242 {
243 int ret = req->get_ret_status();
244 retcode = ret;
245 if (ret == -ENOENT && empty_on_enoent) {
246 *result = T();
247 } else {
248 if (ret < 0) {
249 return ret;
250 }
251 try {
252 bufferlist::iterator iter = bl.begin();
253 if (iter.end()) {
254 // allow successful reads with empty buffers. ReadSyncStatus coroutines
255 // depend on this to be able to read without locking, because the
256 // cls lock from InitSyncStatus will create an empty object if it didnt
257 // exist
258 *result = T();
259 } else {
260 ::decode(*result, iter);
261 }
262 } catch (buffer::error& err) {
263 return -EIO;
264 }
265 }
266
267 return handle_data(*result);
268 }
269
270 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
271 RGWAsyncRadosProcessor *async_rados;
272 RGWRados *store;
273 RGWObjectCtx obj_ctx;
274 bufferlist bl;
275
276 rgw_raw_obj obj;
277
278 map<string, bufferlist> *pattrs;
279
280 RGWAsyncGetSystemObj *req;
281
282 public:
283 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
284 const rgw_raw_obj& _obj,
285 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
286 async_rados(_async_rados), store(_store),
287 obj_ctx(store),
288 obj(_obj),
289 pattrs(_pattrs),
290 req(NULL) { }
291 ~RGWSimpleRadosReadAttrsCR() override {
292 request_cleanup();
293 }
294
295 void request_cleanup() override {
296 if (req) {
297 req->finish();
298 req = NULL;
299 }
300 }
301
302 int send_request() override;
303 int request_complete() override;
304 };
305
306 template <class T>
307 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
308 RGWAsyncRadosProcessor *async_rados;
309 RGWRados *store;
310 bufferlist bl;
311
312 rgw_raw_obj obj;
313 RGWObjVersionTracker *objv_tracker;
314
315 RGWAsyncPutSystemObj *req{nullptr};
316
317 public:
318 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
319 const rgw_raw_obj& _obj,
320 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
321 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
322 store(_store), obj(_obj), objv_tracker(objv_tracker) {
323 ::encode(_data, bl);
324 }
325
326 ~RGWSimpleRadosWriteCR() override {
327 request_cleanup();
328 }
329
330 void request_cleanup() override {
331 if (req) {
332 req->finish();
333 req = NULL;
334 }
335 }
336
337 int send_request() override {
338 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
339 store, objv_tracker, obj, false, bl);
340 async_rados->queue(req);
341 return 0;
342 }
343
344 int request_complete() override {
345 return req->get_ret_status();
346 }
347 };
348
349 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
350 RGWAsyncRadosProcessor *async_rados;
351 RGWRados *store;
352
353 rgw_raw_obj obj;
354
355 map<string, bufferlist> attrs;
356
357 RGWAsyncPutSystemObjAttrs *req;
358
359 public:
360 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
361 const rgw_raw_obj& _obj,
362 map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
363 async_rados(_async_rados),
364 store(_store),
365 obj(_obj),
366 attrs(_attrs), req(NULL) {
367 }
368 ~RGWSimpleRadosWriteAttrsCR() override {
369 request_cleanup();
370 }
371
372 void request_cleanup() override {
373 if (req) {
374 req->finish();
375 req = NULL;
376 }
377 }
378
379 int send_request() override {
380 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
381 store, NULL, obj, &attrs);
382 async_rados->queue(req);
383 return 0;
384 }
385
386 int request_complete() override {
387 return req->get_ret_status();
388 }
389 };
390
391 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
392 RGWRados *store;
393 map<string, bufferlist> entries;
394
395 rgw_rados_ref ref;
396
397 rgw_raw_obj obj;
398
399 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
400
401 public:
402 RGWRadosSetOmapKeysCR(RGWRados *_store,
403 const rgw_raw_obj& _obj,
404 map<string, bufferlist>& _entries);
405
406 int send_request() override;
407 int request_complete() override;
408 };
409
410 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
411 RGWRados *store;
412
413 string marker;
414 map<string, bufferlist> *entries;
415 int max_entries;
416
417 int rval;
418 rgw_rados_ref ref;
419
420 rgw_raw_obj obj;
421
422 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
423
424 public:
425 RGWRadosGetOmapKeysCR(RGWRados *_store,
426 const rgw_raw_obj& _obj,
427 const string& _marker,
428 map<string, bufferlist> *_entries, int _max_entries);
429
430 int send_request() override;
431
432 int request_complete() override {
433 return rval;
434 }
435 };
436
437 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
438 RGWRados *store;
439
440 rgw_rados_ref ref;
441
442 set<string> keys;
443
444 rgw_raw_obj obj;
445
446 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
447
448 public:
449 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
450 const rgw_raw_obj& _obj,
451 const set<string>& _keys);
452
453 int send_request() override;
454
455 int request_complete() override;
456 };
457
458 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
459 RGWRados *store;
460 librados::IoCtx ioctx;
461 const rgw_raw_obj obj;
462 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
463
464 public:
465 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
466
467 int send_request();
468 int request_complete();
469 };
470
471 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
472 RGWAsyncRadosProcessor *async_rados;
473 RGWRados *store;
474 string lock_name;
475 string cookie;
476 uint32_t duration;
477
478 rgw_raw_obj obj;
479
480 RGWAsyncLockSystemObj *req;
481
482 public:
483 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
484 const rgw_raw_obj& _obj,
485 const string& _lock_name,
486 const string& _cookie,
487 uint32_t _duration);
488 ~RGWSimpleRadosLockCR() override {
489 request_cleanup();
490 }
491 void request_cleanup() override;
492
493 int send_request() override;
494 int request_complete() override;
495
496 static std::string gen_random_cookie(CephContext* cct) {
497 #define COOKIE_LEN 16
498 char buf[COOKIE_LEN + 1];
499 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
500 return buf;
501 }
502 };
503
504 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
505 RGWAsyncRadosProcessor *async_rados;
506 RGWRados *store;
507 string lock_name;
508 string cookie;
509
510 rgw_raw_obj obj;
511
512 RGWAsyncUnlockSystemObj *req;
513
514 public:
515 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
516 const rgw_raw_obj& _obj,
517 const string& _lock_name,
518 const string& _cookie);
519 ~RGWSimpleRadosUnlockCR() override {
520 request_cleanup();
521 }
522 void request_cleanup() override;
523
524 int send_request() override;
525 int request_complete() override;
526 };
527
528 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
529
530 class RGWOmapAppend : public RGWConsumerCR<string> {
531 RGWAsyncRadosProcessor *async_rados;
532 RGWRados *store;
533
534 rgw_raw_obj obj;
535
536 bool going_down;
537
538 int num_pending_entries;
539 list<string> pending_entries;
540
541 map<string, bufferlist> entries;
542
543 uint64_t window_size;
544 uint64_t total_entries;
545 public:
546 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
547 const rgw_raw_obj& _obj,
548 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
549 int operate() override;
550 void flush_pending();
551 bool append(const string& s);
552 bool finish();
553
554 uint64_t get_total_entries() {
555 return total_entries;
556 }
557
558 const rgw_raw_obj& get_obj() {
559 return obj;
560 }
561 };
562
563 class RGWAsyncWait : public RGWAsyncRadosRequest {
564 CephContext *cct;
565 Mutex *lock;
566 Cond *cond;
567 utime_t interval;
568 protected:
569 int _send_request() override {
570 Mutex::Locker l(*lock);
571 return cond->WaitInterval(*lock, interval);
572 }
573 public:
574 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
575 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
576 cct(_cct),
577 lock(_lock), cond(_cond), interval(_secs, 0) {}
578
579 void wakeup() {
580 Mutex::Locker l(*lock);
581 cond->Signal();
582 }
583 };
584
585 class RGWWaitCR : public RGWSimpleCoroutine {
586 CephContext *cct;
587 RGWAsyncRadosProcessor *async_rados;
588 Mutex *lock;
589 Cond *cond;
590 int secs;
591
592 RGWAsyncWait *req;
593
594 public:
595 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
596 Mutex *_lock, Cond *_cond,
597 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
598 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
599 }
600 ~RGWWaitCR() override {
601 request_cleanup();
602 }
603
604 void request_cleanup() override {
605 if (req) {
606 wakeup();
607 req->finish();
608 req = NULL;
609 }
610 }
611
612 int send_request() override {
613 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
614 async_rados->queue(req);
615 return 0;
616 }
617
618 int request_complete() override {
619 return req->get_ret_status();
620 }
621
622 void wakeup() {
623 req->wakeup();
624 }
625 };
626
627 class RGWShardedOmapCRManager {
628 RGWAsyncRadosProcessor *async_rados;
629 RGWRados *store;
630 RGWCoroutine *op;
631
632 int num_shards;
633
634 vector<RGWOmapAppend *> shards;
635 public:
636 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
637 : async_rados(_async_rados),
638 store(_store), op(_op), num_shards(_num_shards) {
639 shards.reserve(num_shards);
640 for (int i = 0; i < num_shards; ++i) {
641 char buf[oid_prefix.size() + 16];
642 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
643 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
644 shard->get();
645 shards.push_back(shard);
646 op->spawn(shard, false);
647 }
648 }
649
650 ~RGWShardedOmapCRManager() {
651 for (auto shard : shards) {
652 shard->put();
653 }
654 }
655
656 bool append(const string& entry, int shard_id) {
657 return shards[shard_id]->append(entry);
658 }
659 bool finish() {
660 bool success = true;
661 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
662 success &= ((*iter)->finish() && (!(*iter)->is_error()));
663 }
664 return success;
665 }
666
667 uint64_t get_total_entries(int shard_id) {
668 return shards[shard_id]->get_total_entries();
669 }
670 };
671
672 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
673 RGWRados *store;
674 rgw_bucket bucket;
675 RGWBucketInfo *bucket_info;
676
677 protected:
678 int _send_request() override;
679 public:
680 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
681 RGWRados *_store, const rgw_bucket& bucket,
682 RGWBucketInfo *_bucket_info)
683 : RGWAsyncRadosRequest(caller, cn), store(_store),
684 bucket(bucket), bucket_info(_bucket_info) {}
685 };
686
687 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
688 RGWAsyncRadosProcessor *async_rados;
689 RGWRados *store;
690 rgw_bucket bucket;
691 RGWBucketInfo *bucket_info;
692
693 RGWAsyncGetBucketInstanceInfo *req;
694
695 public:
696 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
697 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
698 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
699 bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
700 ~RGWGetBucketInstanceInfoCR() override {
701 request_cleanup();
702 }
703 void request_cleanup() override {
704 if (req) {
705 req->finish();
706 req = NULL;
707 }
708 }
709
710 int send_request() override {
711 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
712 async_rados->queue(req);
713 return 0;
714 }
715 int request_complete() override {
716 return req->get_ret_status();
717 }
718 };
719
720 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
721 RGWRados *store;
722 string source_zone;
723
724 RGWBucketInfo bucket_info;
725
726 rgw_obj_key key;
727 uint64_t versioned_epoch;
728
729 real_time src_mtime;
730
731 bool copy_if_newer;
732 rgw_zone_set *zones_trace;
733
734 protected:
735 int _send_request() override;
736 public:
737 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
738 const string& _source_zone,
739 RGWBucketInfo& _bucket_info,
740 const rgw_obj_key& _key,
741 uint64_t _versioned_epoch,
742 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
743 source_zone(_source_zone),
744 bucket_info(_bucket_info),
745 key(_key),
746 versioned_epoch(_versioned_epoch),
747 copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
748 };
749
750 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
751 CephContext *cct;
752 RGWAsyncRadosProcessor *async_rados;
753 RGWRados *store;
754 string source_zone;
755
756 RGWBucketInfo bucket_info;
757
758 rgw_obj_key key;
759 uint64_t versioned_epoch;
760
761 real_time src_mtime;
762
763 bool copy_if_newer;
764
765 RGWAsyncFetchRemoteObj *req;
766 rgw_zone_set *zones_trace;
767
768 public:
769 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
770 const string& _source_zone,
771 RGWBucketInfo& _bucket_info,
772 const rgw_obj_key& _key,
773 uint64_t _versioned_epoch,
774 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
775 async_rados(_async_rados), store(_store),
776 source_zone(_source_zone),
777 bucket_info(_bucket_info),
778 key(_key),
779 versioned_epoch(_versioned_epoch),
780 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
781
782
783 ~RGWFetchRemoteObjCR() override {
784 request_cleanup();
785 }
786
787 void request_cleanup() override {
788 if (req) {
789 req->finish();
790 req = NULL;
791 }
792 }
793
794 int send_request() override {
795 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
796 key, versioned_epoch, copy_if_newer, zones_trace);
797 async_rados->queue(req);
798 return 0;
799 }
800
801 int request_complete() override {
802 return req->get_ret_status();
803 }
804 };
805
806 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
807 RGWRados *store;
808 string source_zone;
809
810 RGWBucketInfo bucket_info;
811
812 rgw_obj_key key;
813
814 ceph::real_time *pmtime;
815 uint64_t *psize;
816 map<string, bufferlist> *pattrs;
817
818 protected:
819 int _send_request() override;
820 public:
821 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
822 const string& _source_zone,
823 RGWBucketInfo& _bucket_info,
824 const rgw_obj_key& _key,
825 ceph::real_time *_pmtime,
826 uint64_t *_psize,
827 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
828 source_zone(_source_zone),
829 bucket_info(_bucket_info),
830 key(_key),
831 pmtime(_pmtime),
832 psize(_psize),
833 pattrs(_pattrs) {}
834 };
835
836 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
837 CephContext *cct;
838 RGWAsyncRadosProcessor *async_rados;
839 RGWRados *store;
840 string source_zone;
841
842 RGWBucketInfo bucket_info;
843
844 rgw_obj_key key;
845
846 ceph::real_time *pmtime;
847 uint64_t *psize;
848 map<string, bufferlist> *pattrs;
849
850 RGWAsyncStatRemoteObj *req;
851
852 public:
853 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
854 const string& _source_zone,
855 RGWBucketInfo& _bucket_info,
856 const rgw_obj_key& _key,
857 ceph::real_time *_pmtime,
858 uint64_t *_psize,
859 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
860 async_rados(_async_rados), store(_store),
861 source_zone(_source_zone),
862 bucket_info(_bucket_info),
863 key(_key),
864 pmtime(_pmtime),
865 psize(_psize),
866 pattrs(_pattrs),
867 req(NULL) {}
868
869
870 ~RGWStatRemoteObjCR() override {
871 request_cleanup();
872 }
873
874 void request_cleanup() override {
875 if (req) {
876 req->finish();
877 req = NULL;
878 }
879 }
880
881 int send_request() override {
882 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
883 bucket_info, key, pmtime, psize, pattrs);
884 async_rados->queue(req);
885 return 0;
886 }
887
888 int request_complete() override {
889 return req->get_ret_status();
890 }
891 };
892
893 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
894 RGWRados *store;
895 string source_zone;
896
897 RGWBucketInfo bucket_info;
898
899 rgw_obj_key key;
900 string owner;
901 string owner_display_name;
902 bool versioned;
903 uint64_t versioned_epoch;
904 string marker_version_id;
905
906 bool del_if_older;
907 ceph::real_time timestamp;
908 rgw_zone_set *zones_trace;
909
910 protected:
911 int _send_request() override;
912 public:
913 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
914 const string& _source_zone,
915 RGWBucketInfo& _bucket_info,
916 const rgw_obj_key& _key,
917 const string& _owner,
918 const string& _owner_display_name,
919 bool _versioned,
920 uint64_t _versioned_epoch,
921 bool _delete_marker,
922 bool _if_older,
923 real_time& _timestamp,
924 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
925 source_zone(_source_zone),
926 bucket_info(_bucket_info),
927 key(_key),
928 owner(_owner),
929 owner_display_name(_owner_display_name),
930 versioned(_versioned),
931 versioned_epoch(_versioned_epoch),
932 del_if_older(_if_older),
933 timestamp(_timestamp), zones_trace(_zones_trace) {
934 if (_delete_marker) {
935 marker_version_id = key.instance;
936 }
937 }
938 };
939
940 class RGWRemoveObjCR : public RGWSimpleCoroutine {
941 CephContext *cct;
942 RGWAsyncRadosProcessor *async_rados;
943 RGWRados *store;
944 string source_zone;
945
946 RGWBucketInfo bucket_info;
947
948 rgw_obj_key key;
949 bool versioned;
950 uint64_t versioned_epoch;
951 bool delete_marker;
952 string owner;
953 string owner_display_name;
954
955 bool del_if_older;
956 real_time timestamp;
957
958 RGWAsyncRemoveObj *req;
959
960 rgw_zone_set *zones_trace;
961
962 public:
963 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
964 const string& _source_zone,
965 RGWBucketInfo& _bucket_info,
966 const rgw_obj_key& _key,
967 bool _versioned,
968 uint64_t _versioned_epoch,
969 string *_owner,
970 string *_owner_display_name,
971 bool _delete_marker,
972 real_time *_timestamp,
973 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
974 async_rados(_async_rados), store(_store),
975 source_zone(_source_zone),
976 bucket_info(_bucket_info),
977 key(_key),
978 versioned(_versioned),
979 versioned_epoch(_versioned_epoch),
980 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
981 del_if_older = (_timestamp != NULL);
982 if (_timestamp) {
983 timestamp = *_timestamp;
984 }
985
986 if (_owner) {
987 owner = *_owner;
988 }
989
990 if (_owner_display_name) {
991 owner_display_name = *_owner_display_name;
992 }
993 }
994 ~RGWRemoveObjCR() override {
995 request_cleanup();
996 }
997
998 void request_cleanup() override {
999 if (req) {
1000 req->finish();
1001 req = NULL;
1002 }
1003 }
1004
1005 int send_request() override {
1006 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1007 key, owner, owner_display_name, versioned, versioned_epoch,
1008 delete_marker, del_if_older, timestamp, zones_trace);
1009 async_rados->queue(req);
1010 return 0;
1011 }
1012
1013 int request_complete() override {
1014 return req->get_ret_status();
1015 }
1016 };
1017
1018 class RGWContinuousLeaseCR : public RGWCoroutine {
1019 RGWAsyncRadosProcessor *async_rados;
1020 RGWRados *store;
1021
1022 const rgw_raw_obj obj;
1023
1024 const string lock_name;
1025 const string cookie;
1026
1027 int interval;
1028
1029 Mutex lock;
1030 std::atomic<bool> going_down = { false };
1031 bool locked{false};
1032
1033 RGWCoroutine *caller;
1034
1035 bool aborted{false};
1036
1037 public:
1038 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1039 const rgw_raw_obj& _obj,
1040 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1041 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1042 obj(_obj), lock_name(_lock_name),
1043 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1044 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1045 {}
1046
1047 int operate() override;
1048
1049 bool is_locked() {
1050 Mutex::Locker l(lock);
1051 return locked;
1052 }
1053
1054 void set_locked(bool status) {
1055 Mutex::Locker l(lock);
1056 locked = status;
1057 }
1058
1059 void go_down() {
1060 going_down = true;
1061 wakeup();
1062 }
1063
1064 void abort() {
1065 aborted = true;
1066 }
1067 };
1068
1069 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1070 RGWRados *store;
1071 list<cls_log_entry> entries;
1072
1073 string oid;
1074
1075 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1076
1077 public:
1078 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1079 const cls_log_entry& entry);
1080
1081 int send_request() override;
1082 int request_complete() override;
1083 };
1084
1085 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1086 RGWRados *store;
1087 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1088 protected:
1089 std::string oid;
1090 real_time start_time;
1091 real_time end_time;
1092 std::string from_marker;
1093 std::string to_marker;
1094
1095 public:
1096 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1097 const real_time& start_time, const real_time& end_time,
1098 const std::string& from_marker,
1099 const std::string& to_marker);
1100
1101 int send_request() override;
1102 int request_complete() override;
1103 };
1104
1105 // wrapper to update last_trim_marker on success
1106 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1107 CephContext *cct;
1108 std::string *last_trim_marker;
1109 public:
1110 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1111 const std::string& to_marker, std::string *last_trim_marker);
1112 int request_complete() override;
1113 };
1114
1115 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1116 RGWRados *store;
1117 RGWBucketInfo bucket_info;
1118 rgw_obj obj;
1119 uint64_t *psize;
1120 real_time *pmtime;
1121 uint64_t *pepoch;
1122 RGWObjVersionTracker *objv_tracker;
1123 protected:
1124 int _send_request() override;
1125 public:
1126 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1127 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1128 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1129 RGWObjVersionTracker *objv_tracker = nullptr)
1130 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1131 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1132 };
1133
1134 class RGWStatObjCR : public RGWSimpleCoroutine {
1135 RGWRados *store;
1136 RGWAsyncRadosProcessor *async_rados;
1137 RGWBucketInfo bucket_info;
1138 rgw_obj obj;
1139 uint64_t *psize;
1140 real_time *pmtime;
1141 uint64_t *pepoch;
1142 RGWObjVersionTracker *objv_tracker;
1143 RGWAsyncStatObj *req = nullptr;
1144 public:
1145 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1146 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1147 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1148 RGWObjVersionTracker *objv_tracker = nullptr);
1149 ~RGWStatObjCR() override {
1150 request_cleanup();
1151 }
1152 void request_cleanup() override;
1153
1154 int send_request() override;
1155 int request_complete() override;
1156 };
1157
1158 #endif