]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
c75fa5862f02ac3dbbf637344913bb393ec90439
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
3
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_rados.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
10
11 #include <atomic>
12
13 class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21 protected:
22 virtual int _send_request() = 0;
23 public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59 };
60
61
62 class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65 protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89 public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100 };
101
102
103 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx *obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker *objv_tracker;
108 rgw_raw_obj obj;
109 bufferlist *pbl;
110 map<string, bufferlist> *pattrs;
111 off_t ofs;
112 off_t end;
113 protected:
114 int _send_request() override;
115 public:
116 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118 bufferlist *_pbl, off_t _ofs, off_t _end);
119 void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120 };
121
122 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123 RGWRados *store;
124 RGWObjVersionTracker *objv_tracker;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129 protected:
130 int _send_request() override;
131 public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist& _bl);
135 };
136
137 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138 RGWRados *store;
139 RGWObjVersionTracker *objv_tracker;
140 rgw_raw_obj obj;
141 map<string, bufferlist> *attrs;
142
143 protected:
144 int _send_request() override;
145 public:
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148 map<string, bufferlist> *_attrs);
149 };
150
151 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152 RGWRados *store;
153 rgw_raw_obj obj;
154 string lock_name;
155 string cookie;
156 uint32_t duration_secs;
157
158 protected:
159 int _send_request() override;
160 public:
161 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163 const string& _name, const string& _cookie, uint32_t _duration_secs);
164 };
165
166 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167 RGWRados *store;
168 rgw_raw_obj obj;
169 string lock_name;
170 string cookie;
171
172 protected:
173 int _send_request() override;
174 public:
175 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177 const string& _name, const string& _cookie);
178 };
179
180
181 template <class T>
182 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
183 RGWAsyncRadosProcessor *async_rados;
184 RGWRados *store;
185 RGWObjectCtx obj_ctx;
186 bufferlist bl;
187
188 rgw_raw_obj obj;
189
190 map<string, bufferlist> *pattrs{nullptr};
191
192 T *result;
193 /// on ENOENT, call handle_data() with an empty object instead of failing
194 const bool empty_on_enoent;
195 RGWObjVersionTracker *objv_tracker;
196
197 RGWAsyncGetSystemObj *req{nullptr};
198
199 public:
200 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
201 const rgw_raw_obj& _obj,
202 T *_result, bool empty_on_enoent = true,
203 RGWObjVersionTracker *objv_tracker = nullptr)
204 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
205 obj_ctx(store), obj(_obj), result(_result),
206 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
207 ~RGWSimpleRadosReadCR() override {
208 request_cleanup();
209 }
210
211 void request_cleanup() override {
212 if (req) {
213 req->finish();
214 req = NULL;
215 }
216 }
217
218 int send_request() override;
219 int request_complete() override;
220
221 virtual int handle_data(T& data) {
222 return 0;
223 }
224 };
225
226 template <class T>
227 int RGWSimpleRadosReadCR<T>::send_request()
228 {
229 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
230 store, &obj_ctx, objv_tracker,
231 obj,
232 &bl, 0, -1);
233 if (pattrs) {
234 req->set_read_attrs(pattrs);
235 }
236 async_rados->queue(req);
237 return 0;
238 }
239
240 template <class T>
241 int RGWSimpleRadosReadCR<T>::request_complete()
242 {
243 int ret = req->get_ret_status();
244 retcode = ret;
245 if (ret == -ENOENT && empty_on_enoent) {
246 *result = T();
247 } else {
248 if (ret < 0) {
249 return ret;
250 }
251 try {
252 bufferlist::iterator iter = bl.begin();
253 if (iter.end()) {
254 // allow successful reads with empty buffers. ReadSyncStatus coroutines
255 // depend on this to be able to read without locking, because the
256 // cls lock from InitSyncStatus will create an empty object if it didnt
257 // exist
258 *result = T();
259 } else {
260 ::decode(*result, iter);
261 }
262 } catch (buffer::error& err) {
263 return -EIO;
264 }
265 }
266
267 return handle_data(*result);
268 }
269
270 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
271 RGWAsyncRadosProcessor *async_rados;
272 RGWRados *store;
273 RGWObjectCtx obj_ctx;
274 bufferlist bl;
275
276 rgw_raw_obj obj;
277
278 map<string, bufferlist> *pattrs;
279
280 RGWAsyncGetSystemObj *req;
281
282 public:
283 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
284 const rgw_raw_obj& _obj,
285 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
286 async_rados(_async_rados), store(_store),
287 obj_ctx(store),
288 obj(_obj),
289 pattrs(_pattrs),
290 req(NULL) { }
291 ~RGWSimpleRadosReadAttrsCR() override {
292 request_cleanup();
293 }
294
295 void request_cleanup() override {
296 if (req) {
297 req->finish();
298 req = NULL;
299 }
300 }
301
302 int send_request() override;
303 int request_complete() override;
304 };
305
306 template <class T>
307 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
308 RGWAsyncRadosProcessor *async_rados;
309 RGWRados *store;
310 bufferlist bl;
311
312 rgw_raw_obj obj;
313 RGWObjVersionTracker *objv_tracker;
314
315 RGWAsyncPutSystemObj *req{nullptr};
316
317 public:
318 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
319 const rgw_raw_obj& _obj,
320 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
321 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
322 store(_store), obj(_obj), objv_tracker(objv_tracker) {
323 ::encode(_data, bl);
324 }
325
326 ~RGWSimpleRadosWriteCR() override {
327 request_cleanup();
328 }
329
330 void request_cleanup() override {
331 if (req) {
332 req->finish();
333 req = NULL;
334 }
335 }
336
337 int send_request() override {
338 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
339 store, objv_tracker, obj, false, bl);
340 async_rados->queue(req);
341 return 0;
342 }
343
344 int request_complete() override {
345 return req->get_ret_status();
346 }
347 };
348
349 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
350 RGWAsyncRadosProcessor *async_rados;
351 RGWRados *store;
352
353 rgw_raw_obj obj;
354
355 map<string, bufferlist> attrs;
356
357 RGWAsyncPutSystemObjAttrs *req;
358
359 public:
360 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
361 const rgw_raw_obj& _obj,
362 map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
363 async_rados(_async_rados),
364 store(_store),
365 obj(_obj),
366 attrs(_attrs), req(NULL) {
367 }
368 ~RGWSimpleRadosWriteAttrsCR() override {
369 request_cleanup();
370 }
371
372 void request_cleanup() override {
373 if (req) {
374 req->finish();
375 req = NULL;
376 }
377 }
378
379 int send_request() override {
380 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
381 store, NULL, obj, &attrs);
382 async_rados->queue(req);
383 return 0;
384 }
385
386 int request_complete() override {
387 return req->get_ret_status();
388 }
389 };
390
391 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
392 RGWRados *store;
393 map<string, bufferlist> entries;
394
395 rgw_rados_ref ref;
396
397 rgw_raw_obj obj;
398
399 RGWAioCompletionNotifier *cn;
400
401 public:
402 RGWRadosSetOmapKeysCR(RGWRados *_store,
403 const rgw_raw_obj& _obj,
404 map<string, bufferlist>& _entries);
405
406 ~RGWRadosSetOmapKeysCR() override;
407
408 int send_request() override;
409 int request_complete() override;
410 };
411
412 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
413 RGWRados *store;
414
415 string marker;
416 map<string, bufferlist> *entries;
417 int max_entries;
418
419 int rval;
420 rgw_rados_ref ref;
421
422 rgw_raw_obj obj;
423
424 RGWAioCompletionNotifier *cn;
425
426 public:
427 RGWRadosGetOmapKeysCR(RGWRados *_store,
428 const rgw_raw_obj& _obj,
429 const string& _marker,
430 map<string, bufferlist> *_entries, int _max_entries);
431
432 ~RGWRadosGetOmapKeysCR() override;
433
434 int send_request() override;
435
436 int request_complete() override {
437 return rval;
438 }
439 };
440
441 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
442 RGWRados *store;
443
444 string marker;
445 map<string, bufferlist> *entries;
446 int max_entries;
447
448 int rval;
449 rgw_rados_ref ref;
450
451 set<string> keys;
452
453 rgw_raw_obj obj;
454
455 RGWAioCompletionNotifier *cn;
456
457 public:
458 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
459 const rgw_raw_obj& _obj,
460 const set<string>& _keys);
461
462 ~RGWRadosRemoveOmapKeysCR() override;
463
464 int send_request() override;
465
466 int request_complete() override {
467 return rval;
468 }
469 };
470
471 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
472 RGWRados *store;
473 librados::IoCtx ioctx;
474 const rgw_raw_obj obj;
475 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
476
477 public:
478 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
479
480 int send_request();
481 int request_complete();
482 };
483
484 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
485 RGWAsyncRadosProcessor *async_rados;
486 RGWRados *store;
487 string lock_name;
488 string cookie;
489 uint32_t duration;
490
491 rgw_raw_obj obj;
492
493 RGWAsyncLockSystemObj *req;
494
495 public:
496 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
497 const rgw_raw_obj& _obj,
498 const string& _lock_name,
499 const string& _cookie,
500 uint32_t _duration);
501 ~RGWSimpleRadosLockCR() override {
502 request_cleanup();
503 }
504 void request_cleanup() override;
505
506 int send_request() override;
507 int request_complete() override;
508
509 static std::string gen_random_cookie(CephContext* cct) {
510 #define COOKIE_LEN 16
511 char buf[COOKIE_LEN + 1];
512 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
513 return buf;
514 }
515 };
516
517 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
518 RGWAsyncRadosProcessor *async_rados;
519 RGWRados *store;
520 string lock_name;
521 string cookie;
522
523 rgw_raw_obj obj;
524
525 RGWAsyncUnlockSystemObj *req;
526
527 public:
528 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
529 const rgw_raw_obj& _obj,
530 const string& _lock_name,
531 const string& _cookie);
532 ~RGWSimpleRadosUnlockCR() override {
533 request_cleanup();
534 }
535 void request_cleanup() override;
536
537 int send_request() override;
538 int request_complete() override;
539 };
540
541 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
542
543 class RGWOmapAppend : public RGWConsumerCR<string> {
544 RGWAsyncRadosProcessor *async_rados;
545 RGWRados *store;
546
547 rgw_raw_obj obj;
548
549 bool going_down;
550
551 int num_pending_entries;
552 list<string> pending_entries;
553
554 map<string, bufferlist> entries;
555
556 uint64_t window_size;
557 uint64_t total_entries;
558 public:
559 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
560 const rgw_raw_obj& _obj,
561 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
562 int operate() override;
563 void flush_pending();
564 bool append(const string& s);
565 bool finish();
566
567 uint64_t get_total_entries() {
568 return total_entries;
569 }
570
571 const rgw_raw_obj& get_obj() {
572 return obj;
573 }
574 };
575
576 class RGWAsyncWait : public RGWAsyncRadosRequest {
577 CephContext *cct;
578 Mutex *lock;
579 Cond *cond;
580 utime_t interval;
581 protected:
582 int _send_request() override {
583 Mutex::Locker l(*lock);
584 return cond->WaitInterval(*lock, interval);
585 }
586 public:
587 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
588 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
589 cct(_cct),
590 lock(_lock), cond(_cond), interval(_secs, 0) {}
591
592 void wakeup() {
593 Mutex::Locker l(*lock);
594 cond->Signal();
595 }
596 };
597
598 class RGWWaitCR : public RGWSimpleCoroutine {
599 CephContext *cct;
600 RGWAsyncRadosProcessor *async_rados;
601 Mutex *lock;
602 Cond *cond;
603 int secs;
604
605 RGWAsyncWait *req;
606
607 public:
608 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
609 Mutex *_lock, Cond *_cond,
610 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
611 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
612 }
613 ~RGWWaitCR() override {
614 request_cleanup();
615 }
616
617 void request_cleanup() override {
618 if (req) {
619 wakeup();
620 req->finish();
621 req = NULL;
622 }
623 }
624
625 int send_request() override {
626 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
627 async_rados->queue(req);
628 return 0;
629 }
630
631 int request_complete() override {
632 return req->get_ret_status();
633 }
634
635 void wakeup() {
636 req->wakeup();
637 }
638 };
639
640 class RGWShardedOmapCRManager {
641 RGWAsyncRadosProcessor *async_rados;
642 RGWRados *store;
643 RGWCoroutine *op;
644
645 int num_shards;
646
647 vector<RGWOmapAppend *> shards;
648 public:
649 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
650 : async_rados(_async_rados),
651 store(_store), op(_op), num_shards(_num_shards) {
652 shards.reserve(num_shards);
653 for (int i = 0; i < num_shards; ++i) {
654 char buf[oid_prefix.size() + 16];
655 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
656 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
657 shard->get();
658 shards.push_back(shard);
659 op->spawn(shard, false);
660 }
661 }
662
663 ~RGWShardedOmapCRManager() {
664 for (auto shard : shards) {
665 shard->put();
666 }
667 }
668
669 bool append(const string& entry, int shard_id) {
670 return shards[shard_id]->append(entry);
671 }
672 bool finish() {
673 bool success = true;
674 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
675 success &= ((*iter)->finish() && (!(*iter)->is_error()));
676 }
677 return success;
678 }
679
680 uint64_t get_total_entries(int shard_id) {
681 return shards[shard_id]->get_total_entries();
682 }
683 };
684
685 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
686 RGWRados *store;
687 rgw_bucket bucket;
688 RGWBucketInfo *bucket_info;
689
690 protected:
691 int _send_request() override;
692 public:
693 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
694 RGWRados *_store, const rgw_bucket& bucket,
695 RGWBucketInfo *_bucket_info)
696 : RGWAsyncRadosRequest(caller, cn), store(_store),
697 bucket(bucket), bucket_info(_bucket_info) {}
698 };
699
700 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
701 RGWAsyncRadosProcessor *async_rados;
702 RGWRados *store;
703 rgw_bucket bucket;
704 RGWBucketInfo *bucket_info;
705
706 RGWAsyncGetBucketInstanceInfo *req;
707
708 public:
709 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
710 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
711 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
712 bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
713 ~RGWGetBucketInstanceInfoCR() override {
714 request_cleanup();
715 }
716 void request_cleanup() override {
717 if (req) {
718 req->finish();
719 req = NULL;
720 }
721 }
722
723 int send_request() override {
724 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
725 async_rados->queue(req);
726 return 0;
727 }
728 int request_complete() override {
729 return req->get_ret_status();
730 }
731 };
732
733 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
734 RGWRados *store;
735 string source_zone;
736
737 RGWBucketInfo bucket_info;
738
739 rgw_obj_key key;
740 uint64_t versioned_epoch;
741
742 real_time src_mtime;
743
744 bool copy_if_newer;
745 rgw_zone_set *zones_trace;
746
747 protected:
748 int _send_request() override;
749 public:
750 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
751 const string& _source_zone,
752 RGWBucketInfo& _bucket_info,
753 const rgw_obj_key& _key,
754 uint64_t _versioned_epoch,
755 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
756 source_zone(_source_zone),
757 bucket_info(_bucket_info),
758 key(_key),
759 versioned_epoch(_versioned_epoch),
760 copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
761 };
762
763 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
764 CephContext *cct;
765 RGWAsyncRadosProcessor *async_rados;
766 RGWRados *store;
767 string source_zone;
768
769 RGWBucketInfo bucket_info;
770
771 rgw_obj_key key;
772 uint64_t versioned_epoch;
773
774 real_time src_mtime;
775
776 bool copy_if_newer;
777
778 RGWAsyncFetchRemoteObj *req;
779 rgw_zone_set *zones_trace;
780
781 public:
782 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
783 const string& _source_zone,
784 RGWBucketInfo& _bucket_info,
785 const rgw_obj_key& _key,
786 uint64_t _versioned_epoch,
787 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
788 async_rados(_async_rados), store(_store),
789 source_zone(_source_zone),
790 bucket_info(_bucket_info),
791 key(_key),
792 versioned_epoch(_versioned_epoch),
793 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
794
795
796 ~RGWFetchRemoteObjCR() override {
797 request_cleanup();
798 }
799
800 void request_cleanup() override {
801 if (req) {
802 req->finish();
803 req = NULL;
804 }
805 }
806
807 int send_request() override {
808 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
809 key, versioned_epoch, copy_if_newer, zones_trace);
810 async_rados->queue(req);
811 return 0;
812 }
813
814 int request_complete() override {
815 return req->get_ret_status();
816 }
817 };
818
819 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
820 RGWRados *store;
821 string source_zone;
822
823 RGWBucketInfo bucket_info;
824
825 rgw_obj_key key;
826
827 ceph::real_time *pmtime;
828 uint64_t *psize;
829 map<string, bufferlist> *pattrs;
830
831 protected:
832 int _send_request() override;
833 public:
834 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
835 const string& _source_zone,
836 RGWBucketInfo& _bucket_info,
837 const rgw_obj_key& _key,
838 ceph::real_time *_pmtime,
839 uint64_t *_psize,
840 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
841 source_zone(_source_zone),
842 bucket_info(_bucket_info),
843 key(_key),
844 pmtime(_pmtime),
845 psize(_psize),
846 pattrs(_pattrs) {}
847 };
848
849 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
850 CephContext *cct;
851 RGWAsyncRadosProcessor *async_rados;
852 RGWRados *store;
853 string source_zone;
854
855 RGWBucketInfo bucket_info;
856
857 rgw_obj_key key;
858
859 ceph::real_time *pmtime;
860 uint64_t *psize;
861 map<string, bufferlist> *pattrs;
862
863 RGWAsyncStatRemoteObj *req;
864
865 public:
866 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
867 const string& _source_zone,
868 RGWBucketInfo& _bucket_info,
869 const rgw_obj_key& _key,
870 ceph::real_time *_pmtime,
871 uint64_t *_psize,
872 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
873 async_rados(_async_rados), store(_store),
874 source_zone(_source_zone),
875 bucket_info(_bucket_info),
876 key(_key),
877 pmtime(_pmtime),
878 psize(_psize),
879 pattrs(_pattrs),
880 req(NULL) {}
881
882
883 ~RGWStatRemoteObjCR() override {
884 request_cleanup();
885 }
886
887 void request_cleanup() override {
888 if (req) {
889 req->finish();
890 req = NULL;
891 }
892 }
893
894 int send_request() override {
895 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
896 bucket_info, key, pmtime, psize, pattrs);
897 async_rados->queue(req);
898 return 0;
899 }
900
901 int request_complete() override {
902 return req->get_ret_status();
903 }
904 };
905
906 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
907 RGWRados *store;
908 string source_zone;
909
910 RGWBucketInfo bucket_info;
911
912 rgw_obj_key key;
913 string owner;
914 string owner_display_name;
915 bool versioned;
916 uint64_t versioned_epoch;
917 string marker_version_id;
918
919 bool del_if_older;
920 ceph::real_time timestamp;
921 rgw_zone_set *zones_trace;
922
923 protected:
924 int _send_request() override;
925 public:
926 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
927 const string& _source_zone,
928 RGWBucketInfo& _bucket_info,
929 const rgw_obj_key& _key,
930 const string& _owner,
931 const string& _owner_display_name,
932 bool _versioned,
933 uint64_t _versioned_epoch,
934 bool _delete_marker,
935 bool _if_older,
936 real_time& _timestamp,
937 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
938 source_zone(_source_zone),
939 bucket_info(_bucket_info),
940 key(_key),
941 owner(_owner),
942 owner_display_name(_owner_display_name),
943 versioned(_versioned),
944 versioned_epoch(_versioned_epoch),
945 del_if_older(_if_older),
946 timestamp(_timestamp), zones_trace(_zones_trace) {
947 if (_delete_marker) {
948 marker_version_id = key.instance;
949 }
950 }
951 };
952
953 class RGWRemoveObjCR : public RGWSimpleCoroutine {
954 CephContext *cct;
955 RGWAsyncRadosProcessor *async_rados;
956 RGWRados *store;
957 string source_zone;
958
959 RGWBucketInfo bucket_info;
960
961 rgw_obj_key key;
962 bool versioned;
963 uint64_t versioned_epoch;
964 bool delete_marker;
965 string owner;
966 string owner_display_name;
967
968 bool del_if_older;
969 real_time timestamp;
970
971 RGWAsyncRemoveObj *req;
972
973 rgw_zone_set *zones_trace;
974
975 public:
976 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
977 const string& _source_zone,
978 RGWBucketInfo& _bucket_info,
979 const rgw_obj_key& _key,
980 bool _versioned,
981 uint64_t _versioned_epoch,
982 string *_owner,
983 string *_owner_display_name,
984 bool _delete_marker,
985 real_time *_timestamp,
986 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
987 async_rados(_async_rados), store(_store),
988 source_zone(_source_zone),
989 bucket_info(_bucket_info),
990 key(_key),
991 versioned(_versioned),
992 versioned_epoch(_versioned_epoch),
993 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
994 del_if_older = (_timestamp != NULL);
995 if (_timestamp) {
996 timestamp = *_timestamp;
997 }
998
999 if (_owner) {
1000 owner = *_owner;
1001 }
1002
1003 if (_owner_display_name) {
1004 owner_display_name = *_owner_display_name;
1005 }
1006 }
1007 ~RGWRemoveObjCR() override {
1008 request_cleanup();
1009 }
1010
1011 void request_cleanup() override {
1012 if (req) {
1013 req->finish();
1014 req = NULL;
1015 }
1016 }
1017
1018 int send_request() override {
1019 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1020 key, owner, owner_display_name, versioned, versioned_epoch,
1021 delete_marker, del_if_older, timestamp, zones_trace);
1022 async_rados->queue(req);
1023 return 0;
1024 }
1025
1026 int request_complete() override {
1027 return req->get_ret_status();
1028 }
1029 };
1030
1031 class RGWContinuousLeaseCR : public RGWCoroutine {
1032 RGWAsyncRadosProcessor *async_rados;
1033 RGWRados *store;
1034
1035 const rgw_raw_obj obj;
1036
1037 const string lock_name;
1038 const string cookie;
1039
1040 int interval;
1041
1042 Mutex lock;
1043 std::atomic<bool> going_down = { false };
1044 bool locked{false};
1045
1046 RGWCoroutine *caller;
1047
1048 bool aborted{false};
1049
1050 public:
1051 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1052 const rgw_raw_obj& _obj,
1053 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1054 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1055 obj(_obj), lock_name(_lock_name),
1056 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1057 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1058 {}
1059
1060 int operate() override;
1061
1062 bool is_locked() {
1063 Mutex::Locker l(lock);
1064 return locked;
1065 }
1066
1067 void set_locked(bool status) {
1068 Mutex::Locker l(lock);
1069 locked = status;
1070 }
1071
1072 void go_down() {
1073 going_down = true;
1074 wakeup();
1075 }
1076
1077 void abort() {
1078 aborted = true;
1079 }
1080 };
1081
1082 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1083 RGWRados *store;
1084 list<cls_log_entry> entries;
1085
1086 string oid;
1087
1088 RGWAioCompletionNotifier *cn;
1089
1090 public:
1091 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1092 const cls_log_entry& entry);
1093 ~RGWRadosTimelogAddCR() override;
1094
1095 int send_request() override;
1096 int request_complete() override;
1097 };
1098
1099 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1100 RGWRados *store;
1101 RGWAioCompletionNotifier *cn{nullptr};
1102 protected:
1103 std::string oid;
1104 real_time start_time;
1105 real_time end_time;
1106 std::string from_marker;
1107 std::string to_marker;
1108
1109 public:
1110 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1111 const real_time& start_time, const real_time& end_time,
1112 const std::string& from_marker,
1113 const std::string& to_marker);
1114 ~RGWRadosTimelogTrimCR() override;
1115
1116 int send_request() override;
1117 int request_complete() override;
1118 };
1119
1120 // wrapper to update last_trim_marker on success
1121 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1122 CephContext *cct;
1123 std::string *last_trim_marker;
1124 public:
1125 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1126 const std::string& to_marker, std::string *last_trim_marker);
1127 int request_complete() override;
1128 };
1129
1130 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1131 RGWRados *store;
1132 RGWBucketInfo bucket_info;
1133 rgw_obj obj;
1134 uint64_t *psize;
1135 real_time *pmtime;
1136 uint64_t *pepoch;
1137 RGWObjVersionTracker *objv_tracker;
1138 protected:
1139 int _send_request() override;
1140 public:
1141 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1142 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1143 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1144 RGWObjVersionTracker *objv_tracker = nullptr)
1145 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1146 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1147 };
1148
1149 class RGWStatObjCR : public RGWSimpleCoroutine {
1150 RGWRados *store;
1151 RGWAsyncRadosProcessor *async_rados;
1152 RGWBucketInfo bucket_info;
1153 rgw_obj obj;
1154 uint64_t *psize;
1155 real_time *pmtime;
1156 uint64_t *pepoch;
1157 RGWObjVersionTracker *objv_tracker;
1158 RGWAsyncStatObj *req = nullptr;
1159 public:
1160 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1161 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1162 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1163 RGWObjVersionTracker *objv_tracker = nullptr);
1164 ~RGWStatObjCR() override {
1165 request_cleanup();
1166 }
1167 void request_cleanup() override;
1168
1169 int send_request() override;
1170 int request_complete() override;
1171 };
1172
1173 #endif