]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.h
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_cr_rados.h
1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
3
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_rados.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
10
11 #include <atomic>
12
13 class RGWAsyncRadosRequest : public RefCountedObject {
14 RGWCoroutine *caller;
15 RGWAioCompletionNotifier *notifier;
16
17 int retcode;
18
19 Mutex lock;
20
21 protected:
22 virtual int _send_request() = 0;
23 public:
24 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
26 }
27 ~RGWAsyncRadosRequest() override {
28 if (notifier) {
29 notifier->put();
30 }
31 }
32
33 void send_request() {
34 get();
35 retcode = _send_request();
36 {
37 Mutex::Locker l(lock);
38 if (notifier) {
39 notifier->cb(); // drops its own ref
40 notifier = nullptr;
41 }
42 }
43 put();
44 }
45
46 int get_ret_status() { return retcode; }
47
48 void finish() {
49 {
50 Mutex::Locker l(lock);
51 if (notifier) {
52 // we won't call notifier->cb() to drop its ref, so drop it here
53 notifier->put();
54 notifier = nullptr;
55 }
56 }
57 put();
58 }
59 };
60
61
62 class RGWAsyncRadosProcessor {
63 deque<RGWAsyncRadosRequest *> m_req_queue;
64 std::atomic<bool> going_down = { false };
65 protected:
66 RGWRados *store;
67 ThreadPool m_tp;
68 Throttle req_throttle;
69
70 struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71 RGWAsyncRadosProcessor *processor;
72 RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75 bool _enqueue(RGWAsyncRadosRequest *req) override;
76 void _dequeue(RGWAsyncRadosRequest *req) override {
77 ceph_abort();
78 }
79 bool _empty() override;
80 RGWAsyncRadosRequest *_dequeue() override;
81 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83 void _dump_queue();
84 void _clear() override {
85 assert(processor->m_req_queue.empty());
86 }
87 } req_wq;
88
89 public:
90 RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91 ~RGWAsyncRadosProcessor() {}
92 void start();
93 void stop();
94 void handle_request(RGWAsyncRadosRequest *req);
95 void queue(RGWAsyncRadosRequest *req);
96
97 bool is_going_down() {
98 return going_down;
99 }
100 };
101
102
103 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104 RGWRados *store;
105 RGWObjectCtx obj_ctx;
106 RGWRados::SystemObject::Read::GetObjState read_state;
107 RGWObjVersionTracker objv_tracker;
108 rgw_raw_obj obj;
109 off_t ofs;
110 off_t end;
111 const bool want_attrs;
112 protected:
113 int _send_request() override;
114 public:
115 RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
116 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
117 off_t _ofs, off_t _end, bool want_attrs);
118
119 bufferlist bl;
120 map<string, bufferlist> attrs;
121 };
122
123 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
124 RGWRados *store;
125 rgw_raw_obj obj;
126 bool exclusive;
127 bufferlist bl;
128
129 protected:
130 int _send_request() override;
131 public:
132 RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
134 bool _exclusive, bufferlist _bl);
135
136 RGWObjVersionTracker objv_tracker;
137 };
138
139 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
140 RGWRados *store;
141 rgw_raw_obj obj;
142 map<string, bufferlist> attrs;
143
144 protected:
145 int _send_request() override;
146 public:
147 RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
148 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
149 map<string, bufferlist> _attrs);
150
151 RGWObjVersionTracker objv_tracker;
152 };
153
154 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
155 RGWRados *store;
156 rgw_raw_obj obj;
157 string lock_name;
158 string cookie;
159 uint32_t duration_secs;
160
161 protected:
162 int _send_request() override;
163 public:
164 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
165 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
166 const string& _name, const string& _cookie, uint32_t _duration_secs);
167 };
168
169 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
170 RGWRados *store;
171 rgw_raw_obj obj;
172 string lock_name;
173 string cookie;
174
175 protected:
176 int _send_request() override;
177 public:
178 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
179 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
180 const string& _name, const string& _cookie);
181 };
182
183 template <class T>
184 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
185 RGWAsyncRadosProcessor *async_rados;
186 RGWRados *store;
187 rgw_raw_obj obj;
188 T *result;
189 /// on ENOENT, call handle_data() with an empty object instead of failing
190 const bool empty_on_enoent;
191 RGWObjVersionTracker *objv_tracker;
192 RGWAsyncGetSystemObj *req{nullptr};
193
194 public:
195 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
196 const rgw_raw_obj& _obj,
197 T *_result, bool empty_on_enoent = true,
198 RGWObjVersionTracker *objv_tracker = nullptr)
199 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
200 obj(_obj), result(_result),
201 empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
202 ~RGWSimpleRadosReadCR() override {
203 request_cleanup();
204 }
205
206 void request_cleanup() override {
207 if (req) {
208 req->finish();
209 req = NULL;
210 }
211 }
212
213 int send_request() override;
214 int request_complete() override;
215
216 virtual int handle_data(T& data) {
217 return 0;
218 }
219 };
220
221 template <class T>
222 int RGWSimpleRadosReadCR<T>::send_request()
223 {
224 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
225 store, objv_tracker, obj, 0, -1, false);
226 async_rados->queue(req);
227 return 0;
228 }
229
230 template <class T>
231 int RGWSimpleRadosReadCR<T>::request_complete()
232 {
233 int ret = req->get_ret_status();
234 retcode = ret;
235 if (ret == -ENOENT && empty_on_enoent) {
236 *result = T();
237 } else {
238 if (ret < 0) {
239 return ret;
240 }
241 try {
242 bufferlist::iterator iter = req->bl.begin();
243 if (iter.end()) {
244 // allow successful reads with empty buffers. ReadSyncStatus coroutines
245 // depend on this to be able to read without locking, because the
246 // cls lock from InitSyncStatus will create an empty object if it didnt
247 // exist
248 *result = T();
249 } else {
250 ::decode(*result, iter);
251 }
252 } catch (buffer::error& err) {
253 return -EIO;
254 }
255 }
256
257 return handle_data(*result);
258 }
259
260 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
261 RGWAsyncRadosProcessor *async_rados;
262 RGWRados *store;
263 rgw_raw_obj obj;
264 map<string, bufferlist> *pattrs;
265 RGWAsyncGetSystemObj *req;
266
267 public:
268 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
269 const rgw_raw_obj& _obj,
270 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
271 async_rados(_async_rados), store(_store),
272 obj(_obj),
273 pattrs(_pattrs),
274 req(NULL) { }
275 ~RGWSimpleRadosReadAttrsCR() override {
276 request_cleanup();
277 }
278
279 void request_cleanup() override {
280 if (req) {
281 req->finish();
282 req = NULL;
283 }
284 }
285
286 int send_request() override;
287 int request_complete() override;
288 };
289
290 template <class T>
291 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
292 RGWAsyncRadosProcessor *async_rados;
293 RGWRados *store;
294 bufferlist bl;
295 rgw_raw_obj obj;
296 RGWObjVersionTracker *objv_tracker;
297 RGWAsyncPutSystemObj *req{nullptr};
298
299 public:
300 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
301 const rgw_raw_obj& _obj,
302 const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
303 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
304 store(_store), obj(_obj), objv_tracker(objv_tracker) {
305 ::encode(_data, bl);
306 }
307
308 ~RGWSimpleRadosWriteCR() override {
309 request_cleanup();
310 }
311
312 void request_cleanup() override {
313 if (req) {
314 req->finish();
315 req = NULL;
316 }
317 }
318
319 int send_request() override {
320 req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
321 store, objv_tracker, obj, false, std::move(bl));
322 async_rados->queue(req);
323 return 0;
324 }
325
326 int request_complete() override {
327 if (objv_tracker) { // copy the updated version
328 *objv_tracker = req->objv_tracker;
329 }
330 return req->get_ret_status();
331 }
332 };
333
334 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
335 RGWAsyncRadosProcessor *async_rados;
336 RGWRados *store;
337 RGWObjVersionTracker *objv_tracker;
338 rgw_raw_obj obj;
339 map<string, bufferlist> attrs;
340 RGWAsyncPutSystemObjAttrs *req = nullptr;
341
342 public:
343 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados,
344 RGWRados *_store, const rgw_raw_obj& _obj,
345 map<string, bufferlist> _attrs,
346 RGWObjVersionTracker *objv_tracker = nullptr)
347 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
348 store(_store), objv_tracker(objv_tracker), obj(_obj),
349 attrs(std::move(_attrs)) {
350 }
351 ~RGWSimpleRadosWriteAttrsCR() override {
352 request_cleanup();
353 }
354
355 void request_cleanup() override {
356 if (req) {
357 req->finish();
358 req = NULL;
359 }
360 }
361
362 int send_request() override {
363 req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
364 store, objv_tracker, obj, std::move(attrs));
365 async_rados->queue(req);
366 return 0;
367 }
368
369 int request_complete() override {
370 if (objv_tracker) { // copy the updated version
371 *objv_tracker = req->objv_tracker;
372 }
373 return req->get_ret_status();
374 }
375 };
376
377 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
378 RGWRados *store;
379 map<string, bufferlist> entries;
380
381 rgw_rados_ref ref;
382
383 rgw_raw_obj obj;
384
385 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
386
387 public:
388 RGWRadosSetOmapKeysCR(RGWRados *_store,
389 const rgw_raw_obj& _obj,
390 map<string, bufferlist>& _entries);
391
392 int send_request() override;
393 int request_complete() override;
394 };
395
396 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
397 RGWRados *store;
398
399 string marker;
400 std::set<std::string> *entries;
401 int max_entries;
402
403 rgw_rados_ref ref;
404
405 rgw_raw_obj obj;
406
407 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
408
409 public:
410 RGWRadosGetOmapKeysCR(RGWRados *_store,
411 const rgw_raw_obj& _obj,
412 const string& _marker,
413 std::set<std::string> *_entries, int _max_entries);
414
415 int send_request() override;
416 int request_complete() override;
417 };
418
419 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
420 RGWRados *store;
421
422 rgw_rados_ref ref;
423
424 set<string> keys;
425
426 rgw_raw_obj obj;
427
428 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
429
430 public:
431 RGWRadosRemoveOmapKeysCR(RGWRados *_store,
432 const rgw_raw_obj& _obj,
433 const set<string>& _keys);
434
435 int send_request() override;
436
437 int request_complete() override;
438 };
439
440 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
441 RGWRados *store;
442 librados::IoCtx ioctx;
443 const rgw_raw_obj obj;
444 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
445
446 public:
447 RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
448
449 int send_request();
450 int request_complete();
451 };
452
453 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
454 RGWAsyncRadosProcessor *async_rados;
455 RGWRados *store;
456 string lock_name;
457 string cookie;
458 uint32_t duration;
459
460 rgw_raw_obj obj;
461
462 RGWAsyncLockSystemObj *req;
463
464 public:
465 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
466 const rgw_raw_obj& _obj,
467 const string& _lock_name,
468 const string& _cookie,
469 uint32_t _duration);
470 ~RGWSimpleRadosLockCR() override {
471 request_cleanup();
472 }
473 void request_cleanup() override;
474
475 int send_request() override;
476 int request_complete() override;
477
478 static std::string gen_random_cookie(CephContext* cct) {
479 #define COOKIE_LEN 16
480 char buf[COOKIE_LEN + 1];
481 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
482 return buf;
483 }
484 };
485
486 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
487 RGWAsyncRadosProcessor *async_rados;
488 RGWRados *store;
489 string lock_name;
490 string cookie;
491
492 rgw_raw_obj obj;
493
494 RGWAsyncUnlockSystemObj *req;
495
496 public:
497 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
498 const rgw_raw_obj& _obj,
499 const string& _lock_name,
500 const string& _cookie);
501 ~RGWSimpleRadosUnlockCR() override {
502 request_cleanup();
503 }
504 void request_cleanup() override;
505
506 int send_request() override;
507 int request_complete() override;
508 };
509
510 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
511
512 class RGWOmapAppend : public RGWConsumerCR<string> {
513 RGWAsyncRadosProcessor *async_rados;
514 RGWRados *store;
515
516 rgw_raw_obj obj;
517
518 bool going_down;
519
520 int num_pending_entries;
521 list<string> pending_entries;
522
523 map<string, bufferlist> entries;
524
525 uint64_t window_size;
526 uint64_t total_entries;
527 public:
528 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
529 const rgw_raw_obj& _obj,
530 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
531 int operate() override;
532 void flush_pending();
533 bool append(const string& s);
534 bool finish();
535
536 uint64_t get_total_entries() {
537 return total_entries;
538 }
539
540 const rgw_raw_obj& get_obj() {
541 return obj;
542 }
543 };
544
545 class RGWAsyncWait : public RGWAsyncRadosRequest {
546 CephContext *cct;
547 Mutex *lock;
548 Cond *cond;
549 utime_t interval;
550 protected:
551 int _send_request() override {
552 Mutex::Locker l(*lock);
553 return cond->WaitInterval(*lock, interval);
554 }
555 public:
556 RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
557 Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
558 cct(_cct),
559 lock(_lock), cond(_cond), interval(_secs, 0) {}
560
561 void wakeup() {
562 Mutex::Locker l(*lock);
563 cond->Signal();
564 }
565 };
566
567 class RGWWaitCR : public RGWSimpleCoroutine {
568 CephContext *cct;
569 RGWAsyncRadosProcessor *async_rados;
570 Mutex *lock;
571 Cond *cond;
572 int secs;
573
574 RGWAsyncWait *req;
575
576 public:
577 RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
578 Mutex *_lock, Cond *_cond,
579 int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
580 async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
581 }
582 ~RGWWaitCR() override {
583 request_cleanup();
584 }
585
586 void request_cleanup() override {
587 if (req) {
588 wakeup();
589 req->finish();
590 req = NULL;
591 }
592 }
593
594 int send_request() override {
595 req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
596 async_rados->queue(req);
597 return 0;
598 }
599
600 int request_complete() override {
601 return req->get_ret_status();
602 }
603
604 void wakeup() {
605 req->wakeup();
606 }
607 };
608
609 class RGWShardedOmapCRManager {
610 RGWAsyncRadosProcessor *async_rados;
611 RGWRados *store;
612 RGWCoroutine *op;
613
614 int num_shards;
615
616 vector<RGWOmapAppend *> shards;
617 public:
618 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
619 : async_rados(_async_rados),
620 store(_store), op(_op), num_shards(_num_shards) {
621 shards.reserve(num_shards);
622 for (int i = 0; i < num_shards; ++i) {
623 char buf[oid_prefix.size() + 16];
624 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
625 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
626 shard->get();
627 shards.push_back(shard);
628 op->spawn(shard, false);
629 }
630 }
631
632 ~RGWShardedOmapCRManager() {
633 for (auto shard : shards) {
634 shard->put();
635 }
636 }
637
638 bool append(const string& entry, int shard_id) {
639 return shards[shard_id]->append(entry);
640 }
641 bool finish() {
642 bool success = true;
643 for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
644 success &= ((*iter)->finish() && (!(*iter)->is_error()));
645 }
646 return success;
647 }
648
649 uint64_t get_total_entries(int shard_id) {
650 return shards[shard_id]->get_total_entries();
651 }
652 };
653
654 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
655 RGWRados *store;
656 const std::string oid;
657
658 protected:
659 int _send_request() override;
660 public:
661 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
662 RGWRados *_store, const std::string& oid)
663 : RGWAsyncRadosRequest(caller, cn), store(_store), oid(oid) {}
664
665 RGWBucketInfo bucket_info;
666 };
667
668 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
669 RGWAsyncRadosProcessor *async_rados;
670 RGWRados *store;
671 const std::string oid;
672 RGWBucketInfo *bucket_info;
673
674 RGWAsyncGetBucketInstanceInfo *req{nullptr};
675
676 public:
677 // metadata key constructor
678 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
679 const std::string& meta_key, RGWBucketInfo *_bucket_info)
680 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
681 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
682 bucket_info(_bucket_info) {}
683 // rgw_bucket constructor
684 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
685 const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
686 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
687 oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
688 bucket_info(_bucket_info) {}
689 ~RGWGetBucketInstanceInfoCR() override {
690 request_cleanup();
691 }
692 void request_cleanup() override {
693 if (req) {
694 req->finish();
695 req = NULL;
696 }
697 }
698
699 int send_request() override {
700 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid);
701 async_rados->queue(req);
702 return 0;
703 }
704 int request_complete() override {
705 if (bucket_info) {
706 *bucket_info = std::move(req->bucket_info);
707 }
708 return req->get_ret_status();
709 }
710 };
711
712 class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
713 RGWRados::BucketShard bs;
714 std::string start_marker;
715 std::string end_marker;
716 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
717 public:
718 RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
719 int shard_id, const std::string& start_marker,
720 const std::string& end_marker);
721
722 int send_request() override;
723 int request_complete() override;
724 };
725
726 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
727 RGWRados *store;
728 string source_zone;
729
730 RGWBucketInfo bucket_info;
731
732 rgw_obj_key key;
733 boost::optional<uint64_t> versioned_epoch;
734
735 real_time src_mtime;
736
737 bool copy_if_newer;
738 rgw_zone_set zones_trace;
739
740 protected:
741 int _send_request() override;
742 public:
743 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
744 const string& _source_zone,
745 RGWBucketInfo& _bucket_info,
746 const rgw_obj_key& _key,
747 boost::optional<uint64_t> _versioned_epoch,
748 bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
749 source_zone(_source_zone),
750 bucket_info(_bucket_info),
751 key(_key),
752 versioned_epoch(_versioned_epoch),
753 copy_if_newer(_if_newer)
754 {
755 if (_zones_trace) {
756 zones_trace = *_zones_trace;
757 }
758 }
759 };
760
761 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
762 CephContext *cct;
763 RGWAsyncRadosProcessor *async_rados;
764 RGWRados *store;
765 string source_zone;
766
767 RGWBucketInfo bucket_info;
768
769 rgw_obj_key key;
770 boost::optional<uint64_t> versioned_epoch;
771
772 real_time src_mtime;
773
774 bool copy_if_newer;
775
776 RGWAsyncFetchRemoteObj *req;
777 rgw_zone_set *zones_trace;
778
779 public:
780 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
781 const string& _source_zone,
782 RGWBucketInfo& _bucket_info,
783 const rgw_obj_key& _key,
784 boost::optional<uint64_t> _versioned_epoch,
785 bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
786 async_rados(_async_rados), store(_store),
787 source_zone(_source_zone),
788 bucket_info(_bucket_info),
789 key(_key),
790 versioned_epoch(_versioned_epoch),
791 copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
792
793
794 ~RGWFetchRemoteObjCR() override {
795 request_cleanup();
796 }
797
798 void request_cleanup() override {
799 if (req) {
800 req->finish();
801 req = NULL;
802 }
803 }
804
805 int send_request() override {
806 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
807 key, versioned_epoch, copy_if_newer, zones_trace);
808 async_rados->queue(req);
809 return 0;
810 }
811
812 int request_complete() override {
813 return req->get_ret_status();
814 }
815 };
816
817 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
818 RGWRados *store;
819 string source_zone;
820
821 RGWBucketInfo bucket_info;
822
823 rgw_obj_key key;
824
825 ceph::real_time *pmtime;
826 uint64_t *psize;
827 map<string, bufferlist> *pattrs;
828
829 protected:
830 int _send_request() override;
831 public:
832 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
833 const string& _source_zone,
834 RGWBucketInfo& _bucket_info,
835 const rgw_obj_key& _key,
836 ceph::real_time *_pmtime,
837 uint64_t *_psize,
838 map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
839 source_zone(_source_zone),
840 bucket_info(_bucket_info),
841 key(_key),
842 pmtime(_pmtime),
843 psize(_psize),
844 pattrs(_pattrs) {}
845 };
846
847 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
848 CephContext *cct;
849 RGWAsyncRadosProcessor *async_rados;
850 RGWRados *store;
851 string source_zone;
852
853 RGWBucketInfo bucket_info;
854
855 rgw_obj_key key;
856
857 ceph::real_time *pmtime;
858 uint64_t *psize;
859 map<string, bufferlist> *pattrs;
860
861 RGWAsyncStatRemoteObj *req;
862
863 public:
864 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
865 const string& _source_zone,
866 RGWBucketInfo& _bucket_info,
867 const rgw_obj_key& _key,
868 ceph::real_time *_pmtime,
869 uint64_t *_psize,
870 map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
871 async_rados(_async_rados), store(_store),
872 source_zone(_source_zone),
873 bucket_info(_bucket_info),
874 key(_key),
875 pmtime(_pmtime),
876 psize(_psize),
877 pattrs(_pattrs),
878 req(NULL) {}
879
880
881 ~RGWStatRemoteObjCR() override {
882 request_cleanup();
883 }
884
885 void request_cleanup() override {
886 if (req) {
887 req->finish();
888 req = NULL;
889 }
890 }
891
892 int send_request() override {
893 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
894 bucket_info, key, pmtime, psize, pattrs);
895 async_rados->queue(req);
896 return 0;
897 }
898
899 int request_complete() override {
900 return req->get_ret_status();
901 }
902 };
903
904 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
905 RGWRados *store;
906 string source_zone;
907
908 RGWBucketInfo bucket_info;
909
910 rgw_obj_key key;
911 string owner;
912 string owner_display_name;
913 bool versioned;
914 uint64_t versioned_epoch;
915 string marker_version_id;
916
917 bool del_if_older;
918 ceph::real_time timestamp;
919 rgw_zone_set zones_trace;
920
921 protected:
922 int _send_request() override;
923 public:
924 RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
925 const string& _source_zone,
926 RGWBucketInfo& _bucket_info,
927 const rgw_obj_key& _key,
928 const string& _owner,
929 const string& _owner_display_name,
930 bool _versioned,
931 uint64_t _versioned_epoch,
932 bool _delete_marker,
933 bool _if_older,
934 real_time& _timestamp,
935 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
936 source_zone(_source_zone),
937 bucket_info(_bucket_info),
938 key(_key),
939 owner(_owner),
940 owner_display_name(_owner_display_name),
941 versioned(_versioned),
942 versioned_epoch(_versioned_epoch),
943 del_if_older(_if_older),
944 timestamp(_timestamp) {
945 if (_delete_marker) {
946 marker_version_id = key.instance;
947 }
948
949 if (_zones_trace) {
950 zones_trace = *_zones_trace;
951 }
952 }
953 };
954
955 class RGWRemoveObjCR : public RGWSimpleCoroutine {
956 CephContext *cct;
957 RGWAsyncRadosProcessor *async_rados;
958 RGWRados *store;
959 string source_zone;
960
961 RGWBucketInfo bucket_info;
962
963 rgw_obj_key key;
964 bool versioned;
965 uint64_t versioned_epoch;
966 bool delete_marker;
967 string owner;
968 string owner_display_name;
969
970 bool del_if_older;
971 real_time timestamp;
972
973 RGWAsyncRemoveObj *req;
974
975 rgw_zone_set *zones_trace;
976
977 public:
978 RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
979 const string& _source_zone,
980 RGWBucketInfo& _bucket_info,
981 const rgw_obj_key& _key,
982 bool _versioned,
983 uint64_t _versioned_epoch,
984 string *_owner,
985 string *_owner_display_name,
986 bool _delete_marker,
987 real_time *_timestamp,
988 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
989 async_rados(_async_rados), store(_store),
990 source_zone(_source_zone),
991 bucket_info(_bucket_info),
992 key(_key),
993 versioned(_versioned),
994 versioned_epoch(_versioned_epoch),
995 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
996 del_if_older = (_timestamp != NULL);
997 if (_timestamp) {
998 timestamp = *_timestamp;
999 }
1000
1001 if (_owner) {
1002 owner = *_owner;
1003 }
1004
1005 if (_owner_display_name) {
1006 owner_display_name = *_owner_display_name;
1007 }
1008 }
1009 ~RGWRemoveObjCR() override {
1010 request_cleanup();
1011 }
1012
1013 void request_cleanup() override {
1014 if (req) {
1015 req->finish();
1016 req = NULL;
1017 }
1018 }
1019
1020 int send_request() override {
1021 req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1022 key, owner, owner_display_name, versioned, versioned_epoch,
1023 delete_marker, del_if_older, timestamp, zones_trace);
1024 async_rados->queue(req);
1025 return 0;
1026 }
1027
1028 int request_complete() override {
1029 return req->get_ret_status();
1030 }
1031 };
1032
1033 class RGWContinuousLeaseCR : public RGWCoroutine {
1034 RGWAsyncRadosProcessor *async_rados;
1035 RGWRados *store;
1036
1037 const rgw_raw_obj obj;
1038
1039 const string lock_name;
1040 const string cookie;
1041
1042 int interval;
1043
1044 Mutex lock;
1045 std::atomic<bool> going_down = { false };
1046 bool locked{false};
1047
1048 RGWCoroutine *caller;
1049
1050 bool aborted{false};
1051
1052 public:
1053 RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1054 const rgw_raw_obj& _obj,
1055 const string& _lock_name, int _interval, RGWCoroutine *_caller)
1056 : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1057 obj(_obj), lock_name(_lock_name),
1058 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1059 interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1060 {}
1061
1062 int operate() override;
1063
1064 bool is_locked() {
1065 Mutex::Locker l(lock);
1066 return locked;
1067 }
1068
1069 void set_locked(bool status) {
1070 Mutex::Locker l(lock);
1071 locked = status;
1072 }
1073
1074 void go_down() {
1075 going_down = true;
1076 wakeup();
1077 }
1078
1079 void abort() {
1080 aborted = true;
1081 }
1082 };
1083
1084 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1085 RGWRados *store;
1086 list<cls_log_entry> entries;
1087
1088 string oid;
1089
1090 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1091
1092 public:
1093 RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1094 const cls_log_entry& entry);
1095
1096 int send_request() override;
1097 int request_complete() override;
1098 };
1099
1100 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1101 RGWRados *store;
1102 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1103 protected:
1104 std::string oid;
1105 real_time start_time;
1106 real_time end_time;
1107 std::string from_marker;
1108 std::string to_marker;
1109
1110 public:
1111 RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1112 const real_time& start_time, const real_time& end_time,
1113 const std::string& from_marker,
1114 const std::string& to_marker);
1115
1116 int send_request() override;
1117 int request_complete() override;
1118 };
1119
1120 // wrapper to update last_trim_marker on success
1121 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1122 CephContext *cct;
1123 std::string *last_trim_marker;
1124 public:
1125 RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1126 const std::string& to_marker, std::string *last_trim_marker);
1127 int request_complete() override;
1128 };
1129
1130 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1131 RGWRados *store;
1132 RGWBucketInfo bucket_info;
1133 rgw_obj obj;
1134 uint64_t *psize;
1135 real_time *pmtime;
1136 uint64_t *pepoch;
1137 RGWObjVersionTracker *objv_tracker;
1138 protected:
1139 int _send_request() override;
1140 public:
1141 RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1142 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1143 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1144 RGWObjVersionTracker *objv_tracker = nullptr)
1145 : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1146 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1147 };
1148
1149 class RGWStatObjCR : public RGWSimpleCoroutine {
1150 RGWRados *store;
1151 RGWAsyncRadosProcessor *async_rados;
1152 RGWBucketInfo bucket_info;
1153 rgw_obj obj;
1154 uint64_t *psize;
1155 real_time *pmtime;
1156 uint64_t *pepoch;
1157 RGWObjVersionTracker *objv_tracker;
1158 RGWAsyncStatObj *req = nullptr;
1159 public:
1160 RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1161 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1162 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1163 RGWObjVersionTracker *objv_tracker = nullptr);
1164 ~RGWStatObjCR() override {
1165 request_cleanup();
1166 }
1167 void request_cleanup() override;
1168
1169 int send_request() override;
1170 int request_complete() override;
1171 };
1172
1173 /// coroutine wrapper for IoCtx::aio_notify()
1174 class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1175 RGWRados *const store;
1176 const rgw_raw_obj obj;
1177 bufferlist request;
1178 const uint64_t timeout_ms;
1179 bufferlist *response;
1180 rgw_rados_ref ref;
1181 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1182
1183 public:
1184 RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
1185 bufferlist& request, uint64_t timeout_ms,
1186 bufferlist *response);
1187
1188 int send_request() override;
1189 int request_complete() override;
1190 };
1191
1192 #endif