]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rados.h
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
CommitLineData
7c673cae
FG
1#ifndef CEPH_RGW_CR_RADOS_H
2#define CEPH_RGW_CR_RADOS_H
3
4#include <boost/intrusive_ptr.hpp>
5#include "include/assert.h"
6#include "rgw_coroutine.h"
7#include "rgw_rados.h"
8#include "common/WorkQueue.h"
9#include "common/Throttle.h"
10
11#include <atomic>
12
13class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21protected:
22 virtual int _send_request() = 0;
23public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59};
60
61
62class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100};
101
102
103class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx *obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker *objv_tracker;
108 rgw_raw_obj obj;
109 bufferlist *pbl;
110 map<string, bufferlist> *pattrs;
111 off_t ofs;
112 off_t end;
113protected:
114 int _send_request() override;
115public:
116 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118 bufferlist *_pbl, off_t _ofs, off_t _end);
119 void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120};
121
122class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123 RGWRados *store;
124 RGWObjVersionTracker *objv_tracker;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129protected:
130 int _send_request() override;
131public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist& _bl);
135};
136
137class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138 RGWRados *store;
139 RGWObjVersionTracker *objv_tracker;
140 rgw_raw_obj obj;
141 map<string, bufferlist> *attrs;
142
143protected:
144 int _send_request() override;
145public:
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148 map<string, bufferlist> *_attrs);
149};
150
151class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152 RGWRados *store;
153 rgw_raw_obj obj;
154 string lock_name;
155 string cookie;
156 uint32_t duration_secs;
157
158protected:
159 int _send_request() override;
160public:
161 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163 const string& _name, const string& _cookie, uint32_t _duration_secs);
164};
165
166class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167 RGWRados *store;
168 rgw_raw_obj obj;
169 string lock_name;
170 string cookie;
171
172protected:
173 int _send_request() override;
174public:
175 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177 const string& _name, const string& _cookie);
178};
179
180
181template <class T>
182class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
183 RGWAsyncRadosProcessor *async_rados;
184 RGWRados *store;
185 RGWObjectCtx obj_ctx;
186 bufferlist bl;
187
188 rgw_raw_obj obj;
189
190 map<string, bufferlist> *pattrs{nullptr};
191
192 T *result;
193 /// on ENOENT, call handle_data() with an empty object instead of failing
194 const bool empty_on_enoent;
195 RGWObjVersionTracker *objv_tracker;
196
197 RGWAsyncGetSystemObj *req{nullptr};
198
199public:
200 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
201 const rgw_raw_obj& _obj,
202 T *_result, bool empty_on_enoent = true,
203 RGWObjVersionTracker *objv_tracker = nullptr)
204 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
205 obj_ctx(store), obj(_obj), result(_result),
206 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
207 ~RGWSimpleRadosReadCR() override {
208 request_cleanup();
209 }
210
211 void request_cleanup() override {
212 if (req) {
213 req->finish();
214 req = NULL;
215 }
216 }
217
218 int send_request() override;
219 int request_complete() override;
220
221 virtual int handle_data(T& data) {
222 return 0;
223 }
224};
225
226template <class T>
227int RGWSimpleRadosReadCR<T>::send_request()
228{
229 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
230 store, &obj_ctx, objv_tracker,
231 obj,
232 &bl, 0, -1);
233 if (pattrs) {
234 req->set_read_attrs(pattrs);
235 }
236 async_rados->queue(req);
237 return 0;
238}
239
240template <class T>
241int RGWSimpleRadosReadCR<T>::request_complete()
242{
243 int ret = req->get_ret_status();
244 retcode = ret;
245 if (ret == -ENOENT && empty_on_enoent) {
246 *result = T();
247 } else {
248 if (ret < 0) {
249 return ret;
250 }
251 try {
252 bufferlist::iterator iter = bl.begin();
253 if (iter.end()) {
254 // allow successful reads with empty buffers. ReadSyncStatus coroutines
255 // depend on this to be able to read without locking, because the
256 // cls lock from InitSyncStatus will create an empty object if it didnt
257 // exist
258 *result = T();
259 } else {
260 ::decode(*result, iter);
261 }
262 } catch (buffer::error& err) {
263 return -EIO;
264 }
265 }
266
267 return handle_data(*result);
268}
269
270class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
271 RGWAsyncRadosProcessor *async_rados;
272 RGWRados *store;
273 RGWObjectCtx obj_ctx;
274 bufferlist bl;
275
276 rgw_raw_obj obj;
277
278 map<string, bufferlist> *pattrs;
279
280 RGWAsyncGetSystemObj *req;
281
282public:
283 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
284 const rgw_raw_obj& _obj,
285 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
286 async_rados(_async_rados), store(_store),
287 obj_ctx(store),
288 obj(_obj),
289 pattrs(_pattrs),
290 req(NULL) { }
291 ~RGWSimpleRadosReadAttrsCR() override {
292 request_cleanup();
293 }
294
295 void request_cleanup() override {
296 if (req) {
297 req->finish();
298 req = NULL;
299 }
300 }
301
302 int send_request() override;
303 int request_complete() override;
304};
305
306template <class T>
307class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
308 RGWAsyncRadosProcessor *async_rados;
309 RGWRados *store;
310 bufferlist bl;
311
312 rgw_raw_obj obj;
313 RGWObjVersionTracker *objv_tracker;
314
315 RGWAsyncPutSystemObj *req{nullptr};
316
317public:
318 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
319 const rgw_raw_obj& _obj,
320 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
321 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
322 store(_store), obj(_obj), objv_tracker(objv_tracker) {
323 ::encode(_data, bl);
324 }
325
326 ~RGWSimpleRadosWriteCR() override {
327 request_cleanup();
328 }
329
330 void request_cleanup() override {
331 if (req) {
332 req->finish();
333 req = NULL;
334 }
335 }
336
337 int send_request() override {
338 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
339 store, objv_tracker, obj, false, bl);
340 async_rados->queue(req);
341 return 0;
342 }
343
344 int request_complete() override {
345 return req->get_ret_status();
346 }
347};
348
349class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
350 RGWAsyncRadosProcessor *async_rados;
351 RGWRados *store;
352
353 rgw_raw_obj obj;
354
355 map<string, bufferlist> attrs;
356
357 RGWAsyncPutSystemObjAttrs *req;
358
359public:
360 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
361 const rgw_raw_obj& _obj,
362 map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
363 async_rados(_async_rados),
364 store(_store),
365 obj(_obj),
366 attrs(_attrs), req(NULL) {
367 }
368 ~RGWSimpleRadosWriteAttrsCR() override {
369 request_cleanup();
370 }
371
372 void request_cleanup() override {
373 if (req) {
374 req->finish();
375 req = NULL;
376 }
377 }
378
379 int send_request() override {
380 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
381 store, NULL, obj, &attrs);
382 async_rados->queue(req);
383 return 0;
384 }
385
386 int request_complete() override {
387 return req->get_ret_status();
388 }
389};
390
391class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
392 RGWRados *store;
393 map<string, bufferlist> entries;
394
395 rgw_rados_ref ref;
396
397 rgw_raw_obj obj;
398
399 RGWAioCompletionNotifier *cn;
400
401public:
402 RGWRadosSetOmapKeysCR(RGWRados *_store,
403 const rgw_raw_obj& _obj,
404 map<string, bufferlist>& _entries);
405
406 ~RGWRadosSetOmapKeysCR() override;
407
408 int send_request() override;
409 int request_complete() override;
410};
411
412class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
413 RGWRados *store;
414
415 string marker;
416 map<string, bufferlist> *entries;
417 int max_entries;
418
419 int rval;
420 rgw_rados_ref ref;
421
422 rgw_raw_obj obj;
423
424 RGWAioCompletionNotifier *cn;
425
426public:
427 RGWRadosGetOmapKeysCR(RGWRados *_store,
428 const rgw_raw_obj& _obj,
429 const string& _marker,
430 map<string, bufferlist> *_entries, int _max_entries);
431
432 ~RGWRadosGetOmapKeysCR() override;
433
434 int send_request() override;
435
436 int request_complete() override {
437 return rval;
438 }
439};
440
441class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
442 RGWRados *store;
443
444 string marker;
445 map<string, bufferlist> *entries;
446 int max_entries;
447
448 int rval;
449 rgw_rados_ref ref;
450
451 set<string> keys;
452
453 rgw_raw_obj obj;
454
455 RGWAioCompletionNotifier *cn;
456
457public:
458 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
459 const rgw_raw_obj& _obj,
460 const set<string>& _keys);
461
462 ~RGWRadosRemoveOmapKeysCR() override;
463
464 int send_request() override;
465
466 int request_complete() override {
467 return rval;
468 }
469};
470
471class RGWRadosRemoveCR : public RGWSimpleCoroutine {
472 RGWRados *store;
473 librados::IoCtx ioctx;
474 const rgw_raw_obj obj;
475 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
476
477public:
478 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
479
480 int send_request();
481 int request_complete();
482};
483
484class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
485 RGWAsyncRadosProcessor *async_rados;
486 RGWRados *store;
487 string lock_name;
488 string cookie;
489 uint32_t duration;
490
491 rgw_raw_obj obj;
492
493 RGWAsyncLockSystemObj *req;
494
495public:
496 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
497 const rgw_raw_obj& _obj,
498 const string& _lock_name,
499 const string& _cookie,
500 uint32_t _duration);
501 ~RGWSimpleRadosLockCR() override {
502 request_cleanup();
503 }
504 void request_cleanup() override;
505
506 int send_request() override;
507 int request_complete() override;
508
509 static std::string gen_random_cookie(CephContext* cct) {
510#define COOKIE_LEN 16
511 char buf[COOKIE_LEN + 1];
512 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
513 return buf;
514 }
515};
516
517class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
518 RGWAsyncRadosProcessor *async_rados;
519 RGWRados *store;
520 string lock_name;
521 string cookie;
522
523 rgw_raw_obj obj;
524
525 RGWAsyncUnlockSystemObj *req;
526
527public:
528 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
529 const rgw_raw_obj& _obj,
530 const string& _lock_name,
531 const string& _cookie);
532 ~RGWSimpleRadosUnlockCR() override {
533 request_cleanup();
534 }
535 void request_cleanup() override;
536
537 int send_request() override;
538 int request_complete() override;
539};
540
541#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
542
543class RGWOmapAppend : public RGWConsumerCR<string> {
544 RGWAsyncRadosProcessor *async_rados;
545 RGWRados *store;
546
547 rgw_raw_obj obj;
548
549 bool going_down;
550
551 int num_pending_entries;
552 list<string> pending_entries;
553
554 map<string, bufferlist> entries;
555
556 uint64_t window_size;
557 uint64_t total_entries;
558public:
559 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
560 const rgw_raw_obj& _obj,
561 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
562 int operate() override;
563 void flush_pending();
564 bool append(const string& s);
565 bool finish();
566
567 uint64_t get_total_entries() {
568 return total_entries;
569 }
570
571 const rgw_raw_obj& get_obj() {
572 return obj;
573 }
574};
575
576class RGWAsyncWait : public RGWAsyncRadosRequest {
577 CephContext *cct;
578 Mutex *lock;
579 Cond *cond;
580 utime_t interval;
581protected:
582 int _send_request() override {
583 Mutex::Locker l(*lock);
584 return cond->WaitInterval(*lock, interval);
585 }
586public:
587 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
588 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
589 cct(_cct),
590 lock(_lock), cond(_cond), interval(_secs, 0) {}
591
592 void wakeup() {
593 Mutex::Locker l(*lock);
594 cond->Signal();
595 }
596};
597
598class RGWWaitCR : public RGWSimpleCoroutine {
599 CephContext *cct;
600 RGWAsyncRadosProcessor *async_rados;
601 Mutex *lock;
602 Cond *cond;
603 int secs;
604
605 RGWAsyncWait *req;
606
607public:
608 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
609 Mutex *_lock, Cond *_cond,
610 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
611 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
612 }
613 ~RGWWaitCR() override {
614 request_cleanup();
615 }
616
617 void request_cleanup() override {
618 if (req) {
619 wakeup();
620 req->finish();
621 req = NULL;
622 }
623 }
624
625 int send_request() override {
626 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
627 async_rados->queue(req);
628 return 0;
629 }
630
631 int request_complete() override {
632 return req->get_ret_status();
633 }
634
635 void wakeup() {
636 req->wakeup();
637 }
638};
639
640class RGWShardedOmapCRManager {
641 RGWAsyncRadosProcessor *async_rados;
642 RGWRados *store;
643 RGWCoroutine *op;
644
645 int num_shards;
646
647 vector<RGWOmapAppend *> shards;
648public:
649 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
650 : async_rados(_async_rados),
651 store(_store), op(_op), num_shards(_num_shards) {
652 shards.reserve(num_shards);
653 for (int i = 0; i < num_shards; ++i) {
654 char buf[oid_prefix.size() + 16];
655 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
656 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
657 shard->get();
658 shards.push_back(shard);
659 op->spawn(shard, false);
660 }
661 }
662
663 ~RGWShardedOmapCRManager() {
664 for (auto shard : shards) {
665 shard->put();
666 }
667 }
668
669 bool append(const string& entry, int shard_id) {
670 return shards[shard_id]->append(entry);
671 }
672 bool finish() {
673 bool success = true;
674 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
675 success &= ((*iter)->finish() && (!(*iter)->is_error()));
676 }
677 return success;
678 }
679
680 uint64_t get_total_entries(int shard_id) {
681 return shards[shard_id]->get_total_entries();
682 }
683};
684
685class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
686 RGWRados *store;
687 rgw_bucket bucket;
688 RGWBucketInfo *bucket_info;
689
690protected:
691 int _send_request() override;
692public:
693 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
694 RGWRados *_store, const rgw_bucket& bucket,
695 RGWBucketInfo *_bucket_info)
696 : RGWAsyncRadosRequest(caller, cn), store(_store),
697 bucket(bucket), bucket_info(_bucket_info) {}
698};
699
700class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
701 RGWAsyncRadosProcessor *async_rados;
702 RGWRados *store;
703 rgw_bucket bucket;
704 RGWBucketInfo *bucket_info;
705
706 RGWAsyncGetBucketInstanceInfo *req;
707
708public:
709 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
710 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
711 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
712 bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
713 ~RGWGetBucketInstanceInfoCR() override {
714 request_cleanup();
715 }
716 void request_cleanup() override {
717 if (req) {
718 req->finish();
719 req = NULL;
720 }
721 }
722
723 int send_request() override {
724 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
725 async_rados->queue(req);
726 return 0;
727 }
728 int request_complete() override {
729 return req->get_ret_status();
730 }
731};
732
733class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
734 RGWRados *store;
735 string source_zone;
736
737 RGWBucketInfo bucket_info;
738
739 rgw_obj_key key;
740 uint64_t versioned_epoch;
741
742 real_time src_mtime;
743
744 bool copy_if_newer;
31f18b77 745 rgw_zone_set *zones_trace;
7c673cae
FG
746
747protected:
748 int _send_request() override;
749public:
750 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
751 const string& _source_zone,
752 RGWBucketInfo& _bucket_info,
753 const rgw_obj_key& _key,
754 uint64_t _versioned_epoch,
31f18b77 755 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
7c673cae
FG
756 source_zone(_source_zone),
757 bucket_info(_bucket_info),
758 key(_key),
759 versioned_epoch(_versioned_epoch),
31f18b77 760 copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
7c673cae
FG
761};
762
763class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
764 CephContext *cct;
765 RGWAsyncRadosProcessor *async_rados;
766 RGWRados *store;
767 string source_zone;
768
769 RGWBucketInfo bucket_info;
770
771 rgw_obj_key key;
772 uint64_t versioned_epoch;
773
774 real_time src_mtime;
775
776 bool copy_if_newer;
777
778 RGWAsyncFetchRemoteObj *req;
31f18b77 779 rgw_zone_set *zones_trace;
7c673cae
FG
780
781public:
782 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
783 const string& _source_zone,
784 RGWBucketInfo& _bucket_info,
785 const rgw_obj_key& _key,
786 uint64_t _versioned_epoch,
31f18b77 787 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
7c673cae
FG
788 async_rados(_async_rados), store(_store),
789 source_zone(_source_zone),
790 bucket_info(_bucket_info),
791 key(_key),
792 versioned_epoch(_versioned_epoch),
31f18b77 793 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
7c673cae
FG
794
795
796 ~RGWFetchRemoteObjCR() override {
797 request_cleanup();
798 }
799
800 void request_cleanup() override {
801 if (req) {
802 req->finish();
803 req = NULL;
804 }
805 }
806
807 int send_request() override {
808 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
31f18b77 809 key, versioned_epoch, copy_if_newer, zones_trace);
7c673cae
FG
810 async_rados->queue(req);
811 return 0;
812 }
813
814 int request_complete() override {
815 return req->get_ret_status();
816 }
817};
818
819class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
820 RGWRados *store;
821 string source_zone;
822
823 RGWBucketInfo bucket_info;
824
825 rgw_obj_key key;
826
827 ceph::real_time *pmtime;
828 uint64_t *psize;
829 map<string, bufferlist> *pattrs;
830
831protected:
832 int _send_request() override;
833public:
834 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
835 const string& _source_zone,
836 RGWBucketInfo& _bucket_info,
837 const rgw_obj_key& _key,
838 ceph::real_time *_pmtime,
839 uint64_t *_psize,
840 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
841 source_zone(_source_zone),
842 bucket_info(_bucket_info),
843 key(_key),
844 pmtime(_pmtime),
845 psize(_psize),
846 pattrs(_pattrs) {}
847};
848
849class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
850 CephContext *cct;
851 RGWAsyncRadosProcessor *async_rados;
852 RGWRados *store;
853 string source_zone;
854
855 RGWBucketInfo bucket_info;
856
857 rgw_obj_key key;
858
859 ceph::real_time *pmtime;
860 uint64_t *psize;
861 map<string, bufferlist> *pattrs;
862
863 RGWAsyncStatRemoteObj *req;
864
865public:
866 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
867 const string& _source_zone,
868 RGWBucketInfo& _bucket_info,
869 const rgw_obj_key& _key,
870 ceph::real_time *_pmtime,
871 uint64_t *_psize,
872 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
873 async_rados(_async_rados), store(_store),
874 source_zone(_source_zone),
875 bucket_info(_bucket_info),
876 key(_key),
877 pmtime(_pmtime),
878 psize(_psize),
879 pattrs(_pattrs),
880 req(NULL) {}
881
882
883 ~RGWStatRemoteObjCR() override {
884 request_cleanup();
885 }
886
887 void request_cleanup() override {
888 if (req) {
889 req->finish();
890 req = NULL;
891 }
892 }
893
894 int send_request() override {
895 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
896 bucket_info, key, pmtime, psize, pattrs);
897 async_rados->queue(req);
898 return 0;
899 }
900
901 int request_complete() override {
902 return req->get_ret_status();
903 }
904};
905
906class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
907 RGWRados *store;
908 string source_zone;
909
910 RGWBucketInfo bucket_info;
911
912 rgw_obj_key key;
913 string owner;
914 string owner_display_name;
915 bool versioned;
916 uint64_t versioned_epoch;
917 string marker_version_id;
918
919 bool del_if_older;
920 ceph::real_time timestamp;
31f18b77 921 rgw_zone_set *zones_trace;
7c673cae
FG
922
923protected:
924 int _send_request() override;
925public:
926 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
927 const string& _source_zone,
928 RGWBucketInfo& _bucket_info,
929 const rgw_obj_key& _key,
930 const string& _owner,
931 const string& _owner_display_name,
932 bool _versioned,
933 uint64_t _versioned_epoch,
934 bool _delete_marker,
935 bool _if_older,
31f18b77
FG
936 real_time& _timestamp,
937 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
7c673cae
FG
938 source_zone(_source_zone),
939 bucket_info(_bucket_info),
940 key(_key),
941 owner(_owner),
942 owner_display_name(_owner_display_name),
943 versioned(_versioned),
944 versioned_epoch(_versioned_epoch),
945 del_if_older(_if_older),
31f18b77 946 timestamp(_timestamp), zones_trace(_zones_trace) {
7c673cae
FG
947 if (_delete_marker) {
948 marker_version_id = key.instance;
949 }
950 }
951};
952
953class RGWRemoveObjCR : public RGWSimpleCoroutine {
954 CephContext *cct;
955 RGWAsyncRadosProcessor *async_rados;
956 RGWRados *store;
957 string source_zone;
958
959 RGWBucketInfo bucket_info;
960
961 rgw_obj_key key;
962 bool versioned;
963 uint64_t versioned_epoch;
964 bool delete_marker;
965 string owner;
966 string owner_display_name;
967
968 bool del_if_older;
969 real_time timestamp;
970
971 RGWAsyncRemoveObj *req;
31f18b77
FG
972
973 rgw_zone_set *zones_trace;
7c673cae
FG
974
975public:
976 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
977 const string& _source_zone,
978 RGWBucketInfo& _bucket_info,
979 const rgw_obj_key& _key,
980 bool _versioned,
981 uint64_t _versioned_epoch,
982 string *_owner,
983 string *_owner_display_name,
984 bool _delete_marker,
31f18b77
FG
985 real_time *_timestamp,
986 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
7c673cae
FG
987 async_rados(_async_rados), store(_store),
988 source_zone(_source_zone),
989 bucket_info(_bucket_info),
990 key(_key),
991 versioned(_versioned),
992 versioned_epoch(_versioned_epoch),
31f18b77 993 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
7c673cae
FG
994 del_if_older = (_timestamp != NULL);
995 if (_timestamp) {
996 timestamp = *_timestamp;
997 }
998
999 if (_owner) {
1000 owner = *_owner;
1001 }
1002
1003 if (_owner_display_name) {
1004 owner_display_name = *_owner_display_name;
1005 }
1006 }
1007 ~RGWRemoveObjCR() override {
1008 request_cleanup();
1009 }
1010
1011 void request_cleanup() override {
1012 if (req) {
1013 req->finish();
1014 req = NULL;
1015 }
1016 }
1017
1018 int send_request() override {
1019 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1020 key, owner, owner_display_name, versioned, versioned_epoch,
31f18b77 1021 delete_marker, del_if_older, timestamp, zones_trace);
7c673cae
FG
1022 async_rados->queue(req);
1023 return 0;
1024 }
1025
1026 int request_complete() override {
1027 return req->get_ret_status();
1028 }
1029};
1030
1031class RGWContinuousLeaseCR : public RGWCoroutine {
1032 RGWAsyncRadosProcessor *async_rados;
1033 RGWRados *store;
1034
1035 const rgw_raw_obj obj;
1036
1037 const string lock_name;
1038 const string cookie;
1039
1040 int interval;
1041
1042 Mutex lock;
1043 std::atomic<bool> going_down = { false };
1044 bool locked{false};
1045
1046 RGWCoroutine *caller;
1047
1048 bool aborted{false};
1049
1050public:
1051 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1052 const rgw_raw_obj& _obj,
1053 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1054 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1055 obj(_obj), lock_name(_lock_name),
1056 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1057 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1058 {}
1059
1060 int operate() override;
1061
1062 bool is_locked() {
1063 Mutex::Locker l(lock);
1064 return locked;
1065 }
1066
1067 void set_locked(bool status) {
1068 Mutex::Locker l(lock);
1069 locked = status;
1070 }
1071
1072 void go_down() {
1073 going_down = true;
1074 wakeup();
1075 }
1076
1077 void abort() {
1078 aborted = true;
1079 }
1080};
1081
1082class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1083 RGWRados *store;
1084 list<cls_log_entry> entries;
1085
1086 string oid;
1087
1088 RGWAioCompletionNotifier *cn;
1089
1090public:
1091 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1092 const cls_log_entry& entry);
1093 ~RGWRadosTimelogAddCR() override;
1094
1095 int send_request() override;
1096 int request_complete() override;
1097};
1098
1099class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1100 RGWRados *store;
1101 RGWAioCompletionNotifier *cn{nullptr};
1102 protected:
1103 std::string oid;
1104 real_time start_time;
1105 real_time end_time;
1106 std::string from_marker;
1107 std::string to_marker;
1108
1109 public:
1110 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1111 const real_time& start_time, const real_time& end_time,
1112 const std::string& from_marker,
1113 const std::string& to_marker);
1114 ~RGWRadosTimelogTrimCR() override;
1115
1116 int send_request() override;
1117 int request_complete() override;
1118};
1119
1120// wrapper to update last_trim_marker on success
1121class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1122 CephContext *cct;
1123 std::string *last_trim_marker;
1124 public:
1125 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1126 const std::string& to_marker, std::string *last_trim_marker);
1127 int request_complete() override;
1128};
1129
1130class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1131 RGWRados *store;
1132 RGWBucketInfo bucket_info;
1133 rgw_obj obj;
1134 uint64_t *psize;
1135 real_time *pmtime;
1136 uint64_t *pepoch;
1137 RGWObjVersionTracker *objv_tracker;
1138protected:
1139 int _send_request() override;
1140public:
1141 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1142 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1143 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1144 RGWObjVersionTracker *objv_tracker = nullptr)
1145 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1146 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1147};
1148
1149class RGWStatObjCR : public RGWSimpleCoroutine {
1150 RGWRados *store;
1151 RGWAsyncRadosProcessor *async_rados;
1152 RGWBucketInfo bucket_info;
1153 rgw_obj obj;
1154 uint64_t *psize;
1155 real_time *pmtime;
1156 uint64_t *pepoch;
1157 RGWObjVersionTracker *objv_tracker;
1158 RGWAsyncStatObj *req = nullptr;
1159 public:
1160 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1161 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1162 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1163 RGWObjVersionTracker *objv_tracker = nullptr);
1164 ~RGWStatObjCR() override {
1165 request_cleanup();
1166 }
1167 void request_cleanup() override;
1168
1169 int send_request() override;
1170 int request_complete() override;
1171};
1172
1173#endif