]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_cr_rados.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_cr_rados.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #include <boost/intrusive_ptr.hpp>
7 #include "include/ceph_assert.h"
8 #include "rgw_coroutine.h"
9 #include "rgw_sal.h"
10 #include "rgw_sal_rados.h"
11 #include "common/WorkQueue.h"
12 #include "common/Throttle.h"
13
14 #include <atomic>
15 #include "common/ceph_time.h"
16
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_bucket.h"
19
20 struct rgw_http_param_pair;
21 class RGWRESTConn;
22
23 class RGWAsyncRadosRequest : public RefCountedObject {
24 RGWCoroutine *caller;
25 RGWAioCompletionNotifier *notifier;
26
27 int retcode;
28
29 ceph::mutex lock = ceph::make_mutex("RGWAsyncRadosRequest::lock");
30
31 protected:
32 virtual int _send_request(const DoutPrefixProvider *dpp) = 0;
33 public:
34 RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn)
35 : caller(_caller), notifier(_cn), retcode(0) {
36 }
37 ~RGWAsyncRadosRequest() override {
38 if (notifier) {
39 notifier->put();
40 }
41 }
42
43 void send_request(const DoutPrefixProvider *dpp) {
44 get();
45 retcode = _send_request(dpp);
46 {
47 std::lock_guard l{lock};
48 if (notifier) {
49 notifier->cb(); // drops its own ref
50 notifier = nullptr;
51 }
52 }
53 put();
54 }
55
56 int get_ret_status() { return retcode; }
57
58 void finish() {
59 {
60 std::lock_guard l{lock};
61 if (notifier) {
62 // we won't call notifier->cb() to drop its ref, so drop it here
63 notifier->put();
64 notifier = nullptr;
65 }
66 }
67 put();
68 }
69 };
70
71
72 class RGWAsyncRadosProcessor {
73 std::deque<RGWAsyncRadosRequest *> m_req_queue;
74 std::atomic<bool> going_down = { false };
75 protected:
76 CephContext *cct;
77 ThreadPool m_tp;
78 Throttle req_throttle;
79
80 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
81 RGWAsyncRadosProcessor *processor;
82 RGWWQ(RGWAsyncRadosProcessor *p,
83 ceph::timespan timeout, ceph::timespan suicide_timeout,
84 ThreadPool *tp)
85 : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
86
87 bool _enqueue(RGWAsyncRadosRequest *req) override;
88 void _dequeue(RGWAsyncRadosRequest *req) override {
89 ceph_abort();
90 }
91 bool _empty() override;
92 RGWAsyncRadosRequest *_dequeue() override;
93 using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
94 void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
95 void _dump_queue();
96 void _clear() override {
97 ceph_assert(processor->m_req_queue.empty());
98 }
99
100 CephContext *get_cct() const { return processor->cct; }
101 unsigned get_subsys() const { return ceph_subsys_rgw; }
102 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw async rados processor: ";}
103
104 } req_wq;
105
106 public:
107 RGWAsyncRadosProcessor(CephContext *_cct, int num_threads);
108 ~RGWAsyncRadosProcessor() {}
109 void start();
110 void stop();
111 void handle_request(const DoutPrefixProvider *dpp, RGWAsyncRadosRequest *req);
112 void queue(RGWAsyncRadosRequest *req);
113
114 bool is_going_down() {
115 return going_down;
116 }
117
118 };
119
120 template <class P>
121 class RGWSimpleWriteOnlyAsyncCR : public RGWSimpleCoroutine {
122 RGWAsyncRadosProcessor *async_rados;
123 rgw::sal::RadosStore* store;
124
125 P params;
126 const DoutPrefixProvider *dpp;
127
128 class Request : public RGWAsyncRadosRequest {
129 rgw::sal::RadosStore* store;
130 P params;
131 const DoutPrefixProvider *dpp;
132 protected:
133 int _send_request(const DoutPrefixProvider *dpp) override;
134 public:
135 Request(RGWCoroutine *caller,
136 RGWAioCompletionNotifier *cn,
137 rgw::sal::RadosStore* store,
138 const P& _params,
139 const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn),
140 store(store),
141 params(_params),
142 dpp(dpp) {}
143 } *req{nullptr};
144
145 public:
146 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor *_async_rados,
147 rgw::sal::RadosStore* _store,
148 const P& _params,
149 const DoutPrefixProvider *_dpp) : RGWSimpleCoroutine(_store->ctx()),
150 async_rados(_async_rados),
151 store(_store),
152 params(_params),
153 dpp(_dpp) {}
154
155 ~RGWSimpleWriteOnlyAsyncCR() override {
156 request_cleanup();
157 }
158 void request_cleanup() override {
159 if (req) {
160 req->finish();
161 req = NULL;
162 }
163 }
164
165 int send_request(const DoutPrefixProvider *dpp) override {
166 req = new Request(this,
167 stack->create_completion_notifier(),
168 store,
169 params,
170 dpp);
171
172 async_rados->queue(req);
173 return 0;
174 }
175 int request_complete() override {
176 return req->get_ret_status();
177 }
178 };
179
180
181 template <class P, class R>
182 class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
183 RGWAsyncRadosProcessor *async_rados;
184 rgw::sal::RadosStore* store;
185
186 P params;
187 std::shared_ptr<R> result;
188 const DoutPrefixProvider *dpp;
189
190 class Request : public RGWAsyncRadosRequest {
191 rgw::sal::RadosStore* store;
192 P params;
193 std::shared_ptr<R> result;
194 const DoutPrefixProvider *dpp;
195 protected:
196 int _send_request(const DoutPrefixProvider *dpp) override;
197 public:
198 Request(const DoutPrefixProvider *dpp,
199 RGWCoroutine *caller,
200 RGWAioCompletionNotifier *cn,
201 rgw::sal::RadosStore* _store,
202 const P& _params,
203 std::shared_ptr<R>& _result,
204 const DoutPrefixProvider *_dpp) : RGWAsyncRadosRequest(caller, cn),
205 store(_store),
206 params(_params),
207 result(_result),
208 dpp(_dpp) {}
209 } *req{nullptr};
210
211 public:
212 RGWSimpleAsyncCR(RGWAsyncRadosProcessor *_async_rados,
213 rgw::sal::RadosStore* _store,
214 const P& _params,
215 std::shared_ptr<R>& _result,
216 const DoutPrefixProvider *_dpp) : RGWSimpleCoroutine(_store->ctx()),
217 async_rados(_async_rados),
218 store(_store),
219 params(_params),
220 result(_result),
221 dpp(_dpp) {}
222
223 ~RGWSimpleAsyncCR() override {
224 request_cleanup();
225 }
226 void request_cleanup() override {
227 if (req) {
228 req->finish();
229 req = NULL;
230 }
231 }
232
233 int send_request(const DoutPrefixProvider *dpp) override {
234 req = new Request(dpp,
235 this,
236 stack->create_completion_notifier(),
237 store,
238 params,
239 result,
240 dpp);
241
242 async_rados->queue(req);
243 return 0;
244 }
245 int request_complete() override {
246 return req->get_ret_status();
247 }
248 };
249
250 class RGWGenericAsyncCR : public RGWSimpleCoroutine {
251 RGWAsyncRadosProcessor *async_rados;
252 rgw::sal::RadosStore* store;
253
254
255 public:
256 class Action {
257 public:
258 virtual ~Action() {}
259 virtual int operate() = 0;
260 };
261
262 private:
263 std::shared_ptr<Action> action;
264
265 class Request : public RGWAsyncRadosRequest {
266 std::shared_ptr<Action> action;
267 protected:
268 int _send_request(const DoutPrefixProvider *dpp) override {
269 if (!action) {
270 return 0;
271 }
272 return action->operate();
273 }
274 public:
275 Request(const DoutPrefixProvider *dpp,
276 RGWCoroutine *caller,
277 RGWAioCompletionNotifier *cn,
278 std::shared_ptr<Action>& _action) : RGWAsyncRadosRequest(caller, cn),
279 action(_action) {}
280 } *req{nullptr};
281
282 public:
283 RGWGenericAsyncCR(CephContext *_cct,
284 RGWAsyncRadosProcessor *_async_rados,
285 std::shared_ptr<Action>& _action) : RGWSimpleCoroutine(_cct),
286 async_rados(_async_rados),
287 action(_action) {}
288 template<typename T>
289 RGWGenericAsyncCR(CephContext *_cct,
290 RGWAsyncRadosProcessor *_async_rados,
291 std::shared_ptr<T>& _action) : RGWSimpleCoroutine(_cct),
292 async_rados(_async_rados),
293 action(std::static_pointer_cast<Action>(_action)) {}
294
295 ~RGWGenericAsyncCR() override {
296 request_cleanup();
297 }
298 void request_cleanup() override {
299 if (req) {
300 req->finish();
301 req = NULL;
302 }
303 }
304
305 int send_request(const DoutPrefixProvider *dpp) override {
306 req = new Request(dpp, this,
307 stack->create_completion_notifier(),
308 action);
309
310 async_rados->queue(req);
311 return 0;
312 }
313 int request_complete() override {
314 return req->get_ret_status();
315 }
316 };
317
318
319 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
320 const DoutPrefixProvider *dpp;
321 RGWSI_SysObj* svc_sysobj;
322 rgw_raw_obj obj;
323 const bool want_attrs;
324 const bool raw_attrs;
325 protected:
326 int _send_request(const DoutPrefixProvider *dpp) override;
327 public:
328 RGWAsyncGetSystemObj(const DoutPrefixProvider *dpp,
329 RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
330 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
331 bool want_attrs, bool raw_attrs);
332
333 bufferlist bl;
334 std::map<std::string, bufferlist> attrs;
335 RGWObjVersionTracker objv_tracker;
336 };
337
338 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
339 const DoutPrefixProvider *dpp;
340 RGWSI_SysObj *svc;
341 rgw_raw_obj obj;
342 bool exclusive;
343 bufferlist bl;
344
345 protected:
346 int _send_request(const DoutPrefixProvider *dpp) override;
347 public:
348 RGWAsyncPutSystemObj(const DoutPrefixProvider *dpp, RGWCoroutine *caller,
349 RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
350 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
351 bool _exclusive, bufferlist _bl);
352
353 RGWObjVersionTracker objv_tracker;
354 };
355
356 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
357 const DoutPrefixProvider *dpp;
358 RGWSI_SysObj *svc;
359 rgw_raw_obj obj;
360 std::map<std::string, bufferlist> attrs;
361 bool exclusive;
362
363 protected:
364 int _send_request(const DoutPrefixProvider *dpp) override;
365 public:
366 RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider *dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
367 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
368 std::map<std::string, bufferlist> _attrs, bool exclusive);
369
370 RGWObjVersionTracker objv_tracker;
371 };
372
373 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
374 rgw::sal::RadosStore* store;
375 rgw_raw_obj obj;
376 std::string lock_name;
377 std::string cookie;
378 uint32_t duration_secs;
379
380 protected:
381 int _send_request(const DoutPrefixProvider *dpp) override;
382 public:
383 RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
384 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
385 const std::string& _name, const std::string& _cookie, uint32_t _duration_secs);
386 };
387
388 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
389 rgw::sal::RadosStore* store;
390 rgw_raw_obj obj;
391 std::string lock_name;
392 std::string cookie;
393
394 protected:
395 int _send_request(const DoutPrefixProvider *dpp) override;
396 public:
397 RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
398 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
399 const std::string& _name, const std::string& _cookie);
400 };
401
402 template <class T>
403 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
404 const DoutPrefixProvider* dpp;
405 rgw::sal::RadosStore* store;
406 rgw_raw_obj obj;
407 T* result;
408 /// on ENOENT, call handle_data() with an empty object instead of failing
409 const bool empty_on_enoent;
410 RGWObjVersionTracker* objv_tracker;
411
412 T val;
413 rgw_rados_ref ref;
414 ceph::buffer::list bl;
415 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
416
417 public:
418 RGWSimpleRadosReadCR(const DoutPrefixProvider* dpp,
419 rgw::sal::RadosStore* store,
420 const rgw_raw_obj& obj,
421 T* result, bool empty_on_enoent = true,
422 RGWObjVersionTracker* objv_tracker = nullptr)
423 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store),
424 obj(obj), result(result), empty_on_enoent(empty_on_enoent),
425 objv_tracker(objv_tracker) {
426 if (!result) {
427 result = &val;
428 }
429 }
430
431 int send_request(const DoutPrefixProvider *dpp) {
432 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
433 if (r < 0) {
434 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
435 << r << dendl;
436 return r;
437 }
438
439 set_status() << "sending request";
440
441 librados::ObjectReadOperation op;
442 if (objv_tracker) {
443 objv_tracker->prepare_op_for_read(&op);
444 }
445
446 op.read(0, -1, &bl, nullptr);
447
448 cn = stack->create_completion_notifier();
449 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op,
450 nullptr);
451 }
452
453 int request_complete() {
454 int ret = cn->completion()->get_return_value();
455 set_status() << "request complete; ret=" << ret;
456
457 if (ret == -ENOENT && empty_on_enoent) {
458 *result = T();
459 } else {
460 if (ret < 0) {
461 return ret;
462 }
463 try {
464 auto iter = bl.cbegin();
465 if (iter.end()) {
466 // allow successful reads with empty buffers. ReadSyncStatus coroutines
467 // depend on this to be able to read without locking, because the
468 // cls lock from InitSyncStatus will create an empty object if it didn't
469 // exist
470 *result = T();
471 } else {
472 decode(*result, iter);
473 }
474 } catch (buffer::error& err) {
475 return -EIO;
476 }
477 }
478
479 return handle_data(*result);
480 }
481
482 virtual int handle_data(T& data) {
483 return 0;
484 }
485 };
486
487 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
488 const DoutPrefixProvider* dpp;
489 rgw::sal::RadosStore* const store;
490
491 const rgw_raw_obj obj;
492 std::map<std::string, bufferlist>* const pattrs;
493 const bool raw_attrs;
494 RGWObjVersionTracker* const objv_tracker;
495
496 rgw_rados_ref ref;
497 std::map<std::string, bufferlist> unfiltered_attrs;
498 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
499
500 public:
501 RGWSimpleRadosReadAttrsCR(const DoutPrefixProvider* dpp,
502 rgw::sal::RadosStore* store,
503 rgw_raw_obj obj,
504 std::map<std::string, bufferlist>* pattrs,
505 bool raw_attrs,
506 RGWObjVersionTracker* objv_tracker = nullptr)
507 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store),
508 obj(std::move(obj)), pattrs(pattrs), raw_attrs(raw_attrs),
509 objv_tracker(objv_tracker) {}
510
511 int send_request(const DoutPrefixProvider *dpp) override;
512 int request_complete() override;
513 };
514
515 template <class T>
516 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
517 const DoutPrefixProvider* dpp;
518 rgw::sal::RadosStore* const store;
519 rgw_raw_obj obj;
520 RGWObjVersionTracker* objv_tracker;
521 bool exclusive;
522
523 bufferlist bl;
524 rgw_rados_ref ref;
525 std::map<std::string, bufferlist> unfiltered_attrs;
526 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
527
528
529 public:
530 RGWSimpleRadosWriteCR(const DoutPrefixProvider* dpp,
531 rgw::sal::RadosStore* const store,
532 rgw_raw_obj obj, const T& data,
533 RGWObjVersionTracker* objv_tracker = nullptr,
534 bool exclusive = false)
535 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store),
536 obj(std::move(obj)), objv_tracker(objv_tracker), exclusive(exclusive) {
537 encode(data, bl);
538 }
539
540 int send_request(const DoutPrefixProvider *dpp) override {
541 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
542 if (r < 0) {
543 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
544 << r << dendl;
545 return r;
546 }
547
548 set_status() << "sending request";
549
550 librados::ObjectWriteOperation op;
551 if (exclusive) {
552 op.create(true);
553 }
554 if (objv_tracker) {
555 objv_tracker->prepare_op_for_write(&op);
556 }
557 op.write_full(bl);
558
559 cn = stack->create_completion_notifier();
560 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
561 }
562
563 int request_complete() override {
564 int ret = cn->completion()->get_return_value();
565 set_status() << "request complete; ret=" << ret;
566 if (ret >= 0 && objv_tracker) {
567 objv_tracker->apply_write();
568 }
569 return ret;
570 }
571 };
572
573 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
574 const DoutPrefixProvider* dpp;
575 rgw::sal::RadosStore* const store;
576 RGWObjVersionTracker* objv_tracker;
577 rgw_raw_obj obj;
578 std::map<std::string, bufferlist> attrs;
579 bool exclusive;
580
581 rgw_rados_ref ref;
582 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
583
584
585 public:
586 RGWSimpleRadosWriteAttrsCR(const DoutPrefixProvider* dpp,
587 rgw::sal::RadosStore* const store,
588 rgw_raw_obj obj,
589 std::map<std::string, bufferlist> attrs,
590 RGWObjVersionTracker* objv_tracker = nullptr,
591 bool exclusive = false)
592 : RGWSimpleCoroutine(store->ctx()), dpp(dpp),
593 store(store), objv_tracker(objv_tracker),
594 obj(std::move(obj)), attrs(std::move(attrs)),
595 exclusive(exclusive) {}
596
597 int send_request(const DoutPrefixProvider *dpp) override {
598 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
599 if (r < 0) {
600 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
601 << r << dendl;
602 return r;
603 }
604
605 set_status() << "sending request";
606
607 librados::ObjectWriteOperation op;
608 if (exclusive) {
609 op.create(true);
610 }
611 if (objv_tracker) {
612 objv_tracker->prepare_op_for_write(&op);
613 }
614
615 for (const auto& [name, bl] : attrs) {
616 if (!bl.length())
617 continue;
618 op.setxattr(name.c_str(), bl);
619 }
620
621 cn = stack->create_completion_notifier();
622 if (!op.size()) {
623 cn->cb();
624 return 0;
625 }
626
627 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
628 }
629
630 int request_complete() override {
631 int ret = cn->completion()->get_return_value();
632 set_status() << "request complete; ret=" << ret;
633 if (ret >= 0 && objv_tracker) {
634 objv_tracker->apply_write();
635 }
636 return ret;
637 }
638 };
639
640 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
641 rgw::sal::RadosStore* store;
642 std::map<std::string, bufferlist> entries;
643
644 rgw_rados_ref ref;
645
646 rgw_raw_obj obj;
647
648 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
649
650 public:
651 RGWRadosSetOmapKeysCR(rgw::sal::RadosStore* _store,
652 const rgw_raw_obj& _obj,
653 std::map<std::string, bufferlist>& _entries);
654
655 int send_request(const DoutPrefixProvider *dpp) override;
656 int request_complete() override;
657 };
658
659 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
660 public:
661 struct Result {
662 rgw_rados_ref ref;
663 std::set<std::string> entries;
664 bool more = false;
665 };
666 using ResultPtr = std::shared_ptr<Result>;
667
668 RGWRadosGetOmapKeysCR(rgw::sal::RadosStore* _store, const rgw_raw_obj& _obj,
669 const std::string& _marker, int _max_entries,
670 ResultPtr result);
671
672 int send_request(const DoutPrefixProvider *dpp) override;
673 int request_complete() override;
674
675 private:
676 rgw::sal::RadosStore* store;
677 rgw_raw_obj obj;
678 std::string marker;
679 int max_entries;
680 ResultPtr result;
681 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
682 };
683
684 class RGWRadosGetOmapValsCR : public RGWSimpleCoroutine {
685 public:
686 struct Result {
687 rgw_rados_ref ref;
688 std::map<std::string, bufferlist> entries;
689 bool more = false;
690 };
691 using ResultPtr = std::shared_ptr<Result>;
692
693 RGWRadosGetOmapValsCR(rgw::sal::RadosStore* _store, const rgw_raw_obj& _obj,
694 const std::string& _marker, int _max_entries,
695 ResultPtr result);
696
697 int send_request(const DoutPrefixProvider *dpp) override;
698 int request_complete() override;
699
700 private:
701 rgw::sal::RadosStore* store;
702 rgw_raw_obj obj;
703 std::string marker;
704 int max_entries;
705 ResultPtr result;
706 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
707 };
708
709 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
710 rgw::sal::RadosStore* store;
711
712 rgw_rados_ref ref;
713
714 std::set<std::string> keys;
715
716 rgw_raw_obj obj;
717
718 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
719
720 public:
721 RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore* _store,
722 const rgw_raw_obj& _obj,
723 const std::set<std::string>& _keys);
724
725 int send_request(const DoutPrefixProvider *dpp) override;
726
727 int request_complete() override;
728 };
729
730 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
731 rgw::sal::RadosStore* store;
732 librados::IoCtx ioctx;
733 const rgw_raw_obj obj;
734 RGWObjVersionTracker* objv_tracker;
735 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
736
737 public:
738 RGWRadosRemoveCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
739 RGWObjVersionTracker* objv_tracker = nullptr);
740
741 int send_request(const DoutPrefixProvider *dpp) override;
742 int request_complete() override;
743 };
744
745 class RGWRadosRemoveOidCR : public RGWSimpleCoroutine {
746 librados::IoCtx ioctx;
747 const std::string oid;
748 RGWObjVersionTracker* objv_tracker;
749 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
750
751 public:
752 RGWRadosRemoveOidCR(rgw::sal::RadosStore* store,
753 librados::IoCtx&& ioctx, std::string_view oid,
754 RGWObjVersionTracker* objv_tracker = nullptr);
755
756 RGWRadosRemoveOidCR(rgw::sal::RadosStore* store,
757 RGWSI_RADOS::Obj& obj,
758 RGWObjVersionTracker* objv_tracker = nullptr);
759
760 RGWRadosRemoveOidCR(rgw::sal::RadosStore* store,
761 RGWSI_RADOS::Obj&& obj,
762 RGWObjVersionTracker* objv_tracker = nullptr);
763
764 int send_request(const DoutPrefixProvider *dpp) override;
765 int request_complete() override;
766 };
767
768 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
769 RGWAsyncRadosProcessor *async_rados;
770 rgw::sal::RadosStore* store;
771 std::string lock_name;
772 std::string cookie;
773 uint32_t duration;
774
775 rgw_raw_obj obj;
776
777 RGWAsyncLockSystemObj *req;
778
779 public:
780 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
781 const rgw_raw_obj& _obj,
782 const std::string& _lock_name,
783 const std::string& _cookie,
784 uint32_t _duration);
785 ~RGWSimpleRadosLockCR() override {
786 request_cleanup();
787 }
788 void request_cleanup() override;
789
790 int send_request(const DoutPrefixProvider *dpp) override;
791 int request_complete() override;
792
793 static std::string gen_random_cookie(CephContext* cct) {
794 static constexpr std::size_t COOKIE_LEN = 16;
795 char buf[COOKIE_LEN + 1];
796 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
797 return buf;
798 }
799 };
800
801 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
802 RGWAsyncRadosProcessor *async_rados;
803 rgw::sal::RadosStore* store;
804 std::string lock_name;
805 std::string cookie;
806
807 rgw_raw_obj obj;
808
809 RGWAsyncUnlockSystemObj *req;
810
811 public:
812 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
813 const rgw_raw_obj& _obj,
814 const std::string& _lock_name,
815 const std::string& _cookie);
816 ~RGWSimpleRadosUnlockCR() override {
817 request_cleanup();
818 }
819 void request_cleanup() override;
820
821 int send_request(const DoutPrefixProvider *dpp) override;
822 int request_complete() override;
823 };
824
825 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
826
827 class RGWOmapAppend : public RGWConsumerCR<std::string> {
828 RGWAsyncRadosProcessor *async_rados;
829 rgw::sal::RadosStore* store;
830
831 rgw_raw_obj obj;
832
833 bool going_down;
834
835 int num_pending_entries;
836 std::list<std::string> pending_entries;
837
838 std::map<std::string, bufferlist> entries;
839
840 uint64_t window_size;
841 uint64_t total_entries;
842 public:
843 RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
844 const rgw_raw_obj& _obj,
845 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
846 int operate(const DoutPrefixProvider *dpp) override;
847 void flush_pending();
848 bool append(const std::string& s);
849 bool finish();
850
851 uint64_t get_total_entries() {
852 return total_entries;
853 }
854
855 const rgw_raw_obj& get_obj() {
856 return obj;
857 }
858 };
859
860 class RGWShardedOmapCRManager {
861 RGWAsyncRadosProcessor *async_rados;
862 rgw::sal::RadosStore* store;
863 RGWCoroutine *op;
864
865 int num_shards;
866
867 std::vector<RGWOmapAppend *> shards;
868 public:
869 RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const std::string& oid_prefix)
870 : async_rados(_async_rados),
871 store(_store), op(_op), num_shards(_num_shards) {
872 shards.reserve(num_shards);
873 for (int i = 0; i < num_shards; ++i) {
874 char buf[oid_prefix.size() + 16];
875 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
876 RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
877 shard->get();
878 shards.push_back(shard);
879 op->spawn(shard, false);
880 }
881 }
882
883 ~RGWShardedOmapCRManager() {
884 for (auto shard : shards) {
885 shard->put();
886 }
887 }
888
889 bool append(const std::string& entry, int shard_id) {
890 return shards[shard_id]->append(entry);
891 }
892 bool finish() {
893 bool success = true;
894 for (auto& append_op : shards) {
895 success &= (append_op->finish() && (!append_op->is_error()));
896 }
897 return success;
898 }
899
900 uint64_t get_total_entries(int shard_id) {
901 return shards[shard_id]->get_total_entries();
902 }
903 };
904
905 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
906 rgw::sal::RadosStore* store;
907 rgw_bucket bucket;
908 const DoutPrefixProvider *dpp;
909
910 protected:
911 int _send_request(const DoutPrefixProvider *dpp) override;
912 public:
913 RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
914 rgw::sal::RadosStore* _store, const rgw_bucket& bucket,
915 const DoutPrefixProvider *dpp)
916 : RGWAsyncRadosRequest(caller, cn), store(_store), bucket(bucket), dpp(dpp) {}
917
918 RGWBucketInfo bucket_info;
919 std::map<std::string, bufferlist> attrs;
920 };
921
922 class RGWAsyncPutBucketInstanceInfo : public RGWAsyncRadosRequest {
923 rgw::sal::RadosStore* store;
924 RGWBucketInfo& bucket_info;
925 bool exclusive;
926 real_time mtime;
927 std::map<std::string, ceph::bufferlist>* attrs;
928 const DoutPrefixProvider *dpp;
929
930 protected:
931 int _send_request(const DoutPrefixProvider *dpp) override;
932 public:
933 RGWAsyncPutBucketInstanceInfo(RGWCoroutine* caller,
934 RGWAioCompletionNotifier* cn,
935 rgw::sal::RadosStore* store,
936 RGWBucketInfo& bucket_info,
937 bool exclusive,
938 real_time mtime,
939 std::map<std::string, ceph::bufferlist>* attrs,
940 const DoutPrefixProvider* dpp)
941 : RGWAsyncRadosRequest(caller, cn), store(store), bucket_info(bucket_info),
942 exclusive(exclusive), mtime(mtime), attrs(attrs), dpp(dpp) {}
943 };
944
945 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
946 RGWAsyncRadosProcessor *async_rados;
947 rgw::sal::RadosStore* store;
948 rgw_bucket bucket;
949 RGWBucketInfo *bucket_info;
950 std::map<std::string, bufferlist> *pattrs;
951 const DoutPrefixProvider *dpp;
952
953 RGWAsyncGetBucketInstanceInfo *req{nullptr};
954
955 public:
956 // rgw_bucket constructor
957 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
958 const rgw_bucket& _bucket, RGWBucketInfo *_bucket_info,
959 std::map<std::string, bufferlist> *_pattrs, const DoutPrefixProvider *dpp)
960 : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
961 bucket(_bucket), bucket_info(_bucket_info), pattrs(_pattrs), dpp(dpp) {}
962 ~RGWGetBucketInstanceInfoCR() override {
963 request_cleanup();
964 }
965 void request_cleanup() override {
966 if (req) {
967 req->finish();
968 req = NULL;
969 }
970 }
971
972 int send_request(const DoutPrefixProvider *dpp) override {
973 req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, dpp);
974 async_rados->queue(req);
975 return 0;
976 }
977 int request_complete() override {
978 if (bucket_info) {
979 *bucket_info = std::move(req->bucket_info);
980 }
981 if (pattrs) {
982 *pattrs = std::move(req->attrs);
983 }
984 return req->get_ret_status();
985 }
986 };
987
988 class RGWPutBucketInstanceInfoCR : public RGWSimpleCoroutine {
989 RGWAsyncRadosProcessor *async_rados;
990 rgw::sal::RadosStore* store;
991 RGWBucketInfo& bucket_info;
992 bool exclusive;
993 real_time mtime;
994 std::map<std::string, ceph::bufferlist>* attrs;
995 const DoutPrefixProvider *dpp;
996
997 RGWAsyncPutBucketInstanceInfo* req = nullptr;
998
999 public:
1000 // rgw_bucket constructor
1001 RGWPutBucketInstanceInfoCR(RGWAsyncRadosProcessor *async_rados,
1002 rgw::sal::RadosStore* store,
1003 RGWBucketInfo& bucket_info,
1004 bool exclusive,
1005 real_time mtime,
1006 std::map<std::string, ceph::bufferlist>* attrs,
1007 const DoutPrefixProvider *dpp)
1008 : RGWSimpleCoroutine(store->ctx()), async_rados(async_rados), store(store),
1009 bucket_info(bucket_info), exclusive(exclusive),
1010 mtime(mtime), attrs(attrs), dpp(dpp) {}
1011 ~RGWPutBucketInstanceInfoCR() override {
1012 request_cleanup();
1013 }
1014 void request_cleanup() override {
1015 if (req) {
1016 req->finish();
1017 req = nullptr;
1018 }
1019 }
1020
1021 int send_request(const DoutPrefixProvider *dpp) override {
1022 req = new RGWAsyncPutBucketInstanceInfo(this,
1023 stack->create_completion_notifier(),
1024 store, bucket_info, exclusive,
1025 mtime, attrs, dpp);
1026 async_rados->queue(req);
1027 return 0;
1028 }
1029 int request_complete() override {
1030 return req->get_ret_status();
1031 }
1032 };
1033
1034 class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
1035 const RGWBucketInfo& bucket_info;
1036 int shard_id;
1037 const rgw::bucket_index_layout_generation generation;
1038 RGWRados::BucketShard bs;
1039 std::string start_marker;
1040 std::string end_marker;
1041 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1042 public:
1043 RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp,
1044 rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info,
1045 int shard_id,
1046 const rgw::bucket_index_layout_generation& generation,
1047 const std::string& start_marker,
1048 const std::string& end_marker);
1049
1050 int send_request(const DoutPrefixProvider *dpp) override;
1051 int request_complete() override;
1052 };
1053
1054 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
1055 rgw::sal::RadosStore* store;
1056 rgw_zone_id source_zone;
1057
1058 std::optional<rgw_user> user_id;
1059
1060 rgw_bucket src_bucket;
1061 std::optional<rgw_placement_rule> dest_placement_rule;
1062 RGWBucketInfo dest_bucket_info;
1063
1064 rgw_obj_key key;
1065 std::optional<rgw_obj_key> dest_key;
1066 std::optional<uint64_t> versioned_epoch;
1067
1068 real_time src_mtime;
1069
1070 bool copy_if_newer;
1071 std::shared_ptr<RGWFetchObjFilter> filter;
1072 rgw_zone_set_entry source_trace_entry;
1073 rgw_zone_set zones_trace;
1074 PerfCounters* counters;
1075 const DoutPrefixProvider *dpp;
1076
1077 protected:
1078 int _send_request(const DoutPrefixProvider *dpp) override;
1079 public:
1080 RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
1081 const rgw_zone_id& _source_zone,
1082 std::optional<rgw_user>& _user_id,
1083 const rgw_bucket& _src_bucket,
1084 std::optional<rgw_placement_rule> _dest_placement_rule,
1085 const RGWBucketInfo& _dest_bucket_info,
1086 const rgw_obj_key& _key,
1087 const std::optional<rgw_obj_key>& _dest_key,
1088 std::optional<uint64_t> _versioned_epoch,
1089 bool _if_newer,
1090 std::shared_ptr<RGWFetchObjFilter> _filter,
1091 const rgw_zone_set_entry& source_trace_entry,
1092 rgw_zone_set *_zones_trace,
1093 PerfCounters* counters, const DoutPrefixProvider *dpp)
1094 : RGWAsyncRadosRequest(caller, cn), store(_store),
1095 source_zone(_source_zone),
1096 user_id(_user_id),
1097 src_bucket(_src_bucket),
1098 dest_placement_rule(_dest_placement_rule),
1099 dest_bucket_info(_dest_bucket_info),
1100 key(_key),
1101 dest_key(_dest_key),
1102 versioned_epoch(_versioned_epoch),
1103 copy_if_newer(_if_newer),
1104 filter(_filter),
1105 source_trace_entry(source_trace_entry),
1106 counters(counters),
1107 dpp(dpp)
1108 {
1109 if (_zones_trace) {
1110 zones_trace = *_zones_trace;
1111 }
1112 }
1113 };
1114
1115 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
1116 CephContext *cct;
1117 RGWAsyncRadosProcessor *async_rados;
1118 rgw::sal::RadosStore* store;
1119 rgw_zone_id source_zone;
1120
1121 std::optional<rgw_user> user_id;
1122
1123 rgw_bucket src_bucket;
1124 std::optional<rgw_placement_rule> dest_placement_rule;
1125 RGWBucketInfo dest_bucket_info;
1126
1127 rgw_obj_key key;
1128 std::optional<rgw_obj_key> dest_key;
1129 std::optional<uint64_t> versioned_epoch;
1130
1131 real_time src_mtime;
1132
1133 bool copy_if_newer;
1134
1135 std::shared_ptr<RGWFetchObjFilter> filter;
1136
1137 RGWAsyncFetchRemoteObj *req;
1138 const rgw_zone_set_entry& source_trace_entry;
1139 rgw_zone_set *zones_trace;
1140 PerfCounters* counters;
1141 const DoutPrefixProvider *dpp;
1142
1143 public:
1144 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
1145 const rgw_zone_id& _source_zone,
1146 std::optional<rgw_user> _user_id,
1147 const rgw_bucket& _src_bucket,
1148 std::optional<rgw_placement_rule> _dest_placement_rule,
1149 const RGWBucketInfo& _dest_bucket_info,
1150 const rgw_obj_key& _key,
1151 const std::optional<rgw_obj_key>& _dest_key,
1152 std::optional<uint64_t> _versioned_epoch,
1153 bool _if_newer,
1154 std::shared_ptr<RGWFetchObjFilter> _filter,
1155 const rgw_zone_set_entry& source_trace_entry,
1156 rgw_zone_set *_zones_trace,
1157 PerfCounters* counters, const DoutPrefixProvider *dpp)
1158 : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1159 async_rados(_async_rados), store(_store),
1160 source_zone(_source_zone),
1161 user_id(_user_id),
1162 src_bucket(_src_bucket),
1163 dest_placement_rule(_dest_placement_rule),
1164 dest_bucket_info(_dest_bucket_info),
1165 key(_key),
1166 dest_key(_dest_key),
1167 versioned_epoch(_versioned_epoch),
1168 copy_if_newer(_if_newer),
1169 filter(_filter),
1170 req(NULL),
1171 source_trace_entry(source_trace_entry),
1172 zones_trace(_zones_trace), counters(counters), dpp(dpp) {}
1173
1174
1175 ~RGWFetchRemoteObjCR() override {
1176 request_cleanup();
1177 }
1178
1179 void request_cleanup() override {
1180 if (req) {
1181 req->finish();
1182 req = NULL;
1183 }
1184 }
1185
1186 int send_request(const DoutPrefixProvider *dpp) override {
1187 req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
1188 source_zone, user_id, src_bucket, dest_placement_rule, dest_bucket_info,
1189 key, dest_key, versioned_epoch, copy_if_newer, filter,
1190 source_trace_entry, zones_trace, counters, dpp);
1191 async_rados->queue(req);
1192 return 0;
1193 }
1194
1195 int request_complete() override {
1196 return req->get_ret_status();
1197 }
1198 };
1199
1200 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
1201 rgw::sal::RadosStore* store;
1202 rgw_zone_id source_zone;
1203
1204 rgw_bucket src_bucket;
1205 rgw_obj_key key;
1206
1207 ceph::real_time *pmtime;
1208 uint64_t *psize;
1209 std::string *petag;
1210 std::map<std::string, bufferlist> *pattrs;
1211 std::map<std::string, std::string> *pheaders;
1212
1213 protected:
1214 int _send_request(const DoutPrefixProvider *dpp) override;
1215 public:
1216 RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
1217 const rgw_zone_id& _source_zone,
1218 rgw_bucket& _src_bucket,
1219 const rgw_obj_key& _key,
1220 ceph::real_time *_pmtime,
1221 uint64_t *_psize,
1222 std::string *_petag,
1223 std::map<std::string, bufferlist> *_pattrs,
1224 std::map<std::string, std::string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
1225 source_zone(_source_zone),
1226 src_bucket(_src_bucket),
1227 key(_key),
1228 pmtime(_pmtime),
1229 psize(_psize),
1230 petag(_petag),
1231 pattrs(_pattrs),
1232 pheaders(_pheaders) {}
1233 };
1234
1235 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
1236 CephContext *cct;
1237 RGWAsyncRadosProcessor *async_rados;
1238 rgw::sal::RadosStore* store;
1239 rgw_zone_id source_zone;
1240
1241 rgw_bucket src_bucket;
1242 rgw_obj_key key;
1243
1244 ceph::real_time *pmtime;
1245 uint64_t *psize;
1246 std::string *petag;
1247 std::map<std::string, bufferlist> *pattrs;
1248 std::map<std::string, std::string> *pheaders;
1249
1250 RGWAsyncStatRemoteObj *req;
1251
1252 public:
1253 RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
1254 const rgw_zone_id& _source_zone,
1255 rgw_bucket& _src_bucket,
1256 const rgw_obj_key& _key,
1257 ceph::real_time *_pmtime,
1258 uint64_t *_psize,
1259 std::string *_petag,
1260 std::map<std::string, bufferlist> *_pattrs,
1261 std::map<std::string, std::string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
1262 async_rados(_async_rados), store(_store),
1263 source_zone(_source_zone),
1264 src_bucket(_src_bucket),
1265 key(_key),
1266 pmtime(_pmtime),
1267 psize(_psize),
1268 petag(_petag),
1269 pattrs(_pattrs),
1270 pheaders(_pheaders),
1271 req(NULL) {}
1272
1273
1274 ~RGWStatRemoteObjCR() override {
1275 request_cleanup();
1276 }
1277
1278 void request_cleanup() override {
1279 if (req) {
1280 req->finish();
1281 req = NULL;
1282 }
1283 }
1284
1285 int send_request(const DoutPrefixProvider *dpp) override {
1286 req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
1287 src_bucket, key, pmtime, psize, petag, pattrs, pheaders);
1288 async_rados->queue(req);
1289 return 0;
1290 }
1291
1292 int request_complete() override {
1293 return req->get_ret_status();
1294 }
1295 };
1296
1297 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
1298 const DoutPrefixProvider *dpp;
1299 rgw::sal::RadosStore* store;
1300 rgw_zone_id source_zone;
1301
1302 std::unique_ptr<rgw::sal::Bucket> bucket;
1303 std::unique_ptr<rgw::sal::Object> obj;
1304
1305 std::string owner;
1306 std::string owner_display_name;
1307 bool versioned;
1308 uint64_t versioned_epoch;
1309 std::string marker_version_id;
1310
1311 bool del_if_older;
1312 ceph::real_time timestamp;
1313 rgw_zone_set zones_trace;
1314
1315 protected:
1316 int _send_request(const DoutPrefixProvider *dpp) override;
1317 public:
1318 RGWAsyncRemoveObj(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
1319 rgw::sal::RadosStore* _store,
1320 const rgw_zone_id& _source_zone,
1321 RGWBucketInfo& _bucket_info,
1322 const rgw_obj_key& _key,
1323 const std::string& _owner,
1324 const std::string& _owner_display_name,
1325 bool _versioned,
1326 uint64_t _versioned_epoch,
1327 bool _delete_marker,
1328 bool _if_older,
1329 real_time& _timestamp,
1330 rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), store(_store),
1331 source_zone(_source_zone),
1332 owner(_owner),
1333 owner_display_name(_owner_display_name),
1334 versioned(_versioned),
1335 versioned_epoch(_versioned_epoch),
1336 del_if_older(_if_older),
1337 timestamp(_timestamp) {
1338 if (_delete_marker) {
1339 marker_version_id = _key.instance;
1340 }
1341
1342 if (_zones_trace) {
1343 zones_trace = *_zones_trace;
1344 }
1345 store->get_bucket(nullptr, _bucket_info, &bucket);
1346 obj = bucket->get_object(_key);
1347 }
1348 };
1349
1350 class RGWRemoveObjCR : public RGWSimpleCoroutine {
1351 const DoutPrefixProvider *dpp;
1352 CephContext *cct;
1353 RGWAsyncRadosProcessor *async_rados;
1354 rgw::sal::RadosStore* store;
1355 rgw_zone_id source_zone;
1356
1357 RGWBucketInfo bucket_info;
1358
1359 rgw_obj_key key;
1360 bool versioned;
1361 uint64_t versioned_epoch;
1362 bool delete_marker;
1363 std::string owner;
1364 std::string owner_display_name;
1365
1366 bool del_if_older;
1367 real_time timestamp;
1368
1369 RGWAsyncRemoveObj *req;
1370
1371 rgw_zone_set *zones_trace;
1372
1373 public:
1374 RGWRemoveObjCR(const DoutPrefixProvider *_dpp, RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
1375 const rgw_zone_id& _source_zone,
1376 RGWBucketInfo& _bucket_info,
1377 const rgw_obj_key& _key,
1378 bool _versioned,
1379 uint64_t _versioned_epoch,
1380 std::string *_owner,
1381 std::string *_owner_display_name,
1382 bool _delete_marker,
1383 real_time *_timestamp,
1384 rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), dpp(_dpp), cct(_store->ctx()),
1385 async_rados(_async_rados), store(_store),
1386 source_zone(_source_zone),
1387 bucket_info(_bucket_info),
1388 key(_key),
1389 versioned(_versioned),
1390 versioned_epoch(_versioned_epoch),
1391 delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
1392 del_if_older = (_timestamp != NULL);
1393 if (_timestamp) {
1394 timestamp = *_timestamp;
1395 }
1396
1397 if (_owner) {
1398 owner = *_owner;
1399 }
1400
1401 if (_owner_display_name) {
1402 owner_display_name = *_owner_display_name;
1403 }
1404 }
1405 ~RGWRemoveObjCR() override {
1406 request_cleanup();
1407 }
1408
1409 void request_cleanup() override {
1410 if (req) {
1411 req->finish();
1412 req = NULL;
1413 }
1414 }
1415
1416 int send_request(const DoutPrefixProvider *dpp) override {
1417 req = new RGWAsyncRemoveObj(dpp, this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1418 key, owner, owner_display_name, versioned, versioned_epoch,
1419 delete_marker, del_if_older, timestamp, zones_trace);
1420 async_rados->queue(req);
1421 return 0;
1422 }
1423
1424 int request_complete() override {
1425 return req->get_ret_status();
1426 }
1427 };
1428
1429 /// \brief Collect average latency
1430 ///
1431 /// Used in data sync to back off on concurrency when latency of lock
1432 /// operations rises.
1433 ///
1434 /// \warning This class is not thread safe. We do not use a mutex
1435 /// because all coroutines spawned by RGWDataSyncCR share a single thread.
1436 class LatencyMonitor {
1437 ceph::timespan total;
1438 std::uint64_t count = 0;
1439
1440 public:
1441
1442 LatencyMonitor() = default;
1443 void add_latency(ceph::timespan latency) {
1444 total += latency;
1445 ++count;
1446 }
1447
1448 ceph::timespan avg_latency() {
1449 using namespace std::literals;
1450 return count == 0 ? 0s : total / count;
1451 }
1452 };
1453
1454 class RGWContinuousLeaseCR : public RGWCoroutine {
1455 RGWAsyncRadosProcessor* async_rados;
1456 rgw::sal::RadosStore* store;
1457
1458 const rgw_raw_obj obj;
1459
1460 const std::string lock_name;
1461 const std::string cookie{RGWSimpleRadosLockCR::gen_random_cookie(cct)};
1462
1463 int interval;
1464 bool going_down{false};
1465 bool locked{false};
1466
1467 const ceph::timespan interval_tolerance;
1468 const ceph::timespan ts_interval;
1469
1470 RGWCoroutine* caller;
1471
1472 bool aborted{false};
1473
1474 ceph::coarse_mono_time last_renew_try_time;
1475 ceph::coarse_mono_time current_time;
1476
1477 LatencyMonitor* latency;
1478
1479 public:
1480 RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
1481 rgw::sal::RadosStore* _store,
1482 rgw_raw_obj obj, std::string lock_name,
1483 int interval, RGWCoroutine* caller,
1484 LatencyMonitor* const latency)
1485 : RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
1486 obj(std::move(obj)), lock_name(std::move(lock_name)),
1487 interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)),
1488 ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency)
1489 {}
1490
1491 virtual ~RGWContinuousLeaseCR() override;
1492
1493 int operate(const DoutPrefixProvider *dpp) override;
1494
1495 bool is_locked() const {
1496 if (ceph::coarse_mono_clock::now() - last_renew_try_time > ts_interval) {
1497 return false;
1498 }
1499 return locked;
1500 }
1501
1502 void set_locked(bool status) {
1503 locked = status;
1504 }
1505
1506 void go_down() {
1507 going_down = true;
1508 wakeup();
1509 }
1510
1511 void abort() {
1512 aborted = true;
1513 }
1514 };
1515
1516 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1517 const DoutPrefixProvider *dpp;
1518 rgw::sal::RadosStore* store;
1519 std::list<cls_log_entry> entries;
1520
1521 std::string oid;
1522
1523 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1524
1525 public:
1526 RGWRadosTimelogAddCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* _store, const std::string& _oid,
1527 const cls_log_entry& entry);
1528
1529 int send_request(const DoutPrefixProvider *dpp) override;
1530 int request_complete() override;
1531 };
1532
1533 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1534 const DoutPrefixProvider *dpp;
1535 rgw::sal::RadosStore* store;
1536 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1537 protected:
1538 std::string oid;
1539 real_time start_time;
1540 real_time end_time;
1541 std::string from_marker;
1542 std::string to_marker;
1543
1544 public:
1545 RGWRadosTimelogTrimCR(const DoutPrefixProvider *dpp,
1546 rgw::sal::RadosStore* store, const std::string& oid,
1547 const real_time& start_time, const real_time& end_time,
1548 const std::string& from_marker,
1549 const std::string& to_marker);
1550
1551 int send_request(const DoutPrefixProvider *dpp) override;
1552 int request_complete() override;
1553 };
1554
1555 // wrapper to update last_trim_marker on success
1556 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1557 CephContext *cct;
1558 std::string *last_trim_marker;
1559 public:
1560 static constexpr const char* max_marker = "99999999";
1561
1562 RGWSyncLogTrimCR(const DoutPrefixProvider *dpp,
1563 rgw::sal::RadosStore* store, const std::string& oid,
1564 const std::string& to_marker, std::string *last_trim_marker);
1565 int request_complete() override;
1566 };
1567
1568 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1569 const DoutPrefixProvider *dpp;
1570 rgw::sal::RadosStore* store;
1571 RGWBucketInfo bucket_info;
1572 rgw_obj obj;
1573 uint64_t *psize;
1574 real_time *pmtime;
1575 uint64_t *pepoch;
1576 RGWObjVersionTracker *objv_tracker;
1577 protected:
1578 int _send_request(const DoutPrefixProvider *dpp) override;
1579 public:
1580 RGWAsyncStatObj(const DoutPrefixProvider *dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* store,
1581 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1582 real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1583 RGWObjVersionTracker *objv_tracker = nullptr)
1584 : RGWAsyncRadosRequest(caller, cn), dpp(dpp), store(store), obj(obj), psize(psize),
1585 pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1586 };
1587
1588 class RGWStatObjCR : public RGWSimpleCoroutine {
1589 const DoutPrefixProvider *dpp;
1590 rgw::sal::RadosStore* store;
1591 RGWAsyncRadosProcessor *async_rados;
1592 RGWBucketInfo bucket_info;
1593 rgw_obj obj;
1594 uint64_t *psize;
1595 real_time *pmtime;
1596 uint64_t *pepoch;
1597 RGWObjVersionTracker *objv_tracker;
1598 RGWAsyncStatObj *req = nullptr;
1599 public:
1600 RGWStatObjCR(const DoutPrefixProvider *dpp, RGWAsyncRadosProcessor *async_rados, rgw::sal::RadosStore* store,
1601 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1602 real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1603 RGWObjVersionTracker *objv_tracker = nullptr);
1604 ~RGWStatObjCR() override {
1605 request_cleanup();
1606 }
1607 void request_cleanup() override;
1608
1609 int send_request(const DoutPrefixProvider *dpp) override;
1610 int request_complete() override;
1611 };
1612
1613 /// coroutine wrapper for IoCtx::aio_notify()
1614 class RGWRadosNotifyCR : public RGWSimpleCoroutine {
1615 rgw::sal::RadosStore* const store;
1616 const rgw_raw_obj obj;
1617 bufferlist request;
1618 const uint64_t timeout_ms;
1619 bufferlist *response;
1620 rgw_rados_ref ref;
1621 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1622
1623 public:
1624 RGWRadosNotifyCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
1625 bufferlist& request, uint64_t timeout_ms,
1626 bufferlist *response);
1627
1628 int send_request(const DoutPrefixProvider *dpp) override;
1629 int request_complete() override;
1630 };
1631
1632 class RGWDataPostNotifyCR : public RGWCoroutine {
1633 RGWRados *store;
1634 RGWHTTPManager& http_manager;
1635 bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shards;
1636 const char *source_zone;
1637 RGWRESTConn *conn;
1638
1639 public:
1640 RGWDataPostNotifyCR(RGWRados *_store, RGWHTTPManager& _http_manager, bc::flat_map<int,
1641 bc::flat_set<rgw_data_notify_entry> >& _shards, const char *_zone, RGWRESTConn *_conn)
1642 : RGWCoroutine(_store->ctx()), store(_store), http_manager(_http_manager),
1643 shards(_shards), source_zone(_zone), conn(_conn) {}
1644
1645 int operate(const DoutPrefixProvider* dpp) override;
1646 };
1647