]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rados.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
a8e16298 4#include "include/compat.h"
9f95a23c 5#include "rgw_sal.h"
11fdf7f2 6#include "rgw_zone.h"
7c673cae 7#include "rgw_coroutine.h"
7c673cae 8#include "rgw_cr_rados.h"
81eedcae 9#include "rgw_sync_counters.h"
9f95a23c 10#include "rgw_bucket.h"
7c673cae 11
11fdf7f2
TL
12#include "services/svc_zone.h"
13#include "services/svc_zone_utils.h"
14#include "services/svc_sys_obj.h"
9f95a23c 15#include "services/svc_cls.h"
11fdf7f2 16
7c673cae 17#include "cls/lock/cls_lock_client.h"
b32b8144 18#include "cls/rgw/cls_rgw_client.h"
7c673cae 19
31f18b77
FG
20#include <boost/asio/yield.hpp>
21
7c673cae
FG
22#define dout_context g_ceph_context
23#define dout_subsys ceph_subsys_rgw
24
20effc67
TL
25using namespace std;
26
7c673cae
FG
27bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
28 if (processor->is_going_down()) {
29 return false;
30 }
31 req->get();
32 processor->m_req_queue.push_back(req);
33 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
34 _dump_queue();
35 return true;
36}
37
38bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
39 return processor->m_req_queue.empty();
40}
41
42RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
43 if (processor->m_req_queue.empty())
44 return NULL;
45 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
46 processor->m_req_queue.pop_front();
47 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
48 _dump_queue();
49 return req;
50}
51
52void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
b3b6e05e 53 processor->handle_request(this, req);
7c673cae
FG
54 processor->req_throttle.put(1);
55}
56
57void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
11fdf7f2 58 if (!g_conf()->subsys.should_gather<ceph_subsys_rgw, 20>()) {
7c673cae
FG
59 return;
60 }
61 deque<RGWAsyncRadosRequest *>::iterator iter;
62 if (processor->m_req_queue.empty()) {
63 dout(20) << "RGWWQ: empty" << dendl;
64 return;
65 }
66 dout(20) << "RGWWQ:" << dendl;
67 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
68 dout(20) << "req: " << hex << *iter << dec << dendl;
69 }
70}
71
9f95a23c
TL
72RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads)
73 : cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
74 req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2),
f67539c2
TL
75 req_wq(this,
76 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
77 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
78 &m_tp) {
7c673cae
FG
79}
80
81void RGWAsyncRadosProcessor::start() {
82 m_tp.start();
83}
84
85void RGWAsyncRadosProcessor::stop() {
86 going_down = true;
87 m_tp.drain(&req_wq);
88 m_tp.stop();
89 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
90 (*iter)->put();
91 }
92}
93
b3b6e05e
TL
94void RGWAsyncRadosProcessor::handle_request(const DoutPrefixProvider *dpp, RGWAsyncRadosRequest *req) {
95 req->send_request(dpp);
7c673cae
FG
96 req->put();
97}
98
99void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
100 req_throttle.get(1);
101 req_wq.queue(req);
102}
103
b3b6e05e 104int RGWAsyncGetSystemObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae 105{
91327a77
AA
106 map<string, bufferlist> *pattrs = want_attrs ? &attrs : nullptr;
107
11fdf7f2
TL
108 auto sysobj = obj_ctx.get_obj(obj);
109 return sysobj.rop()
110 .set_objv_tracker(&objv_tracker)
111 .set_attrs(pattrs)
112 .set_raw_attrs(raw_attrs)
b3b6e05e 113 .read(dpp, &bl, null_yield);
7c673cae
FG
114}
115
b3b6e05e 116RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
7c673cae 117 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
11fdf7f2 118 bool want_attrs, bool raw_attrs)
b3b6e05e 119 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), obj_ctx(_svc),
11fdf7f2 120 obj(_obj), want_attrs(want_attrs), raw_attrs(raw_attrs)
7c673cae 121{
91327a77
AA
122 if (_objv_tracker) {
123 objv_tracker = *_objv_tracker;
124 }
7c673cae
FG
125}
126
b3b6e05e 127int RGWSimpleRadosReadAttrsCR::send_request(const DoutPrefixProvider *dpp)
7c673cae 128{
b3b6e05e 129 req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(),
f67539c2 130 svc, objv_tracker, obj, true, raw_attrs);
7c673cae
FG
131 async_rados->queue(req);
132 return 0;
133}
134
135int RGWSimpleRadosReadAttrsCR::request_complete()
136{
91327a77
AA
137 if (pattrs) {
138 *pattrs = std::move(req->attrs);
139 }
f67539c2
TL
140 if (objv_tracker) {
141 *objv_tracker = req->objv_tracker;
142 }
7c673cae
FG
143 return req->get_ret_status();
144}
145
b3b6e05e 146int RGWAsyncPutSystemObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae 147{
11fdf7f2
TL
148 auto obj_ctx = svc->init_obj_ctx();
149 auto sysobj = obj_ctx.get_obj(obj);
150 return sysobj.wop()
151 .set_objv_tracker(&objv_tracker)
152 .set_exclusive(exclusive)
b3b6e05e 153 .write_data(dpp, bl, null_yield);
7c673cae
FG
154}
155
b3b6e05e
TL
156RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(const DoutPrefixProvider *_dpp,
157 RGWCoroutine *caller,
158 RGWAioCompletionNotifier *cn,
11fdf7f2 159 RGWSI_SysObj *_svc,
91327a77
AA
160 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
161 bool _exclusive, bufferlist _bl)
b3b6e05e 162 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc(_svc),
91327a77 163 obj(_obj), exclusive(_exclusive), bl(std::move(_bl))
7c673cae 164{
91327a77
AA
165 if (_objv_tracker) {
166 objv_tracker = *_objv_tracker;
167 }
7c673cae
FG
168}
169
b3b6e05e 170int RGWAsyncPutSystemObjAttrs::_send_request(const DoutPrefixProvider *dpp)
7c673cae 171{
11fdf7f2
TL
172 auto obj_ctx = svc->init_obj_ctx();
173 auto sysobj = obj_ctx.get_obj(obj);
174 return sysobj.wop()
175 .set_objv_tracker(&objv_tracker)
176 .set_exclusive(false)
177 .set_attrs(attrs)
b3b6e05e 178 .write_attrs(dpp, null_yield);
7c673cae
FG
179}
180
b3b6e05e 181RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
11fdf7f2 182 RGWSI_SysObj *_svc,
7c673cae 183 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
91327a77 184 map<string, bufferlist> _attrs)
b3b6e05e 185 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc(_svc),
91327a77 186 obj(_obj), attrs(std::move(_attrs))
7c673cae 187{
11fdf7f2
TL
188 if (_objv_tracker) {
189 objv_tracker = *_objv_tracker;
190 }
7c673cae
FG
191}
192
193
20effc67 194RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, const rgw_raw_obj& _obj,
7c673cae
FG
195 uint64_t _window_size)
196 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
197 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
198{
199}
200
b3b6e05e 201int RGWAsyncLockSystemObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
202{
203 rgw_rados_ref ref;
b3b6e05e 204 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
7c673cae 205 if (r < 0) {
b3b6e05e 206 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
7c673cae
FG
207 return r;
208 }
209
210 rados::cls::lock::Lock l(lock_name);
211 utime_t duration(duration_secs, 0);
212 l.set_duration(duration);
213 l.set_cookie(cookie);
f64942e4 214 l.set_may_renew(true);
7c673cae 215
9f95a23c 216 return l.lock_exclusive(&ref.pool.ioctx(), ref.obj.oid);
7c673cae
FG
217}
218
20effc67 219RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
7c673cae
FG
220 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
221 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
222 obj(_obj),
223 lock_name(_name),
224 cookie(_cookie),
225 duration_secs(_duration_secs)
226{
227}
228
b3b6e05e 229int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
230{
231 rgw_rados_ref ref;
b3b6e05e 232 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
7c673cae 233 if (r < 0) {
b3b6e05e 234 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
7c673cae
FG
235 return r;
236 }
237
238 rados::cls::lock::Lock l(lock_name);
239
240 l.set_cookie(cookie);
241
9f95a23c 242 return l.unlock(&ref.pool.ioctx(), ref.obj.oid);
7c673cae
FG
243}
244
20effc67 245RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
7c673cae
FG
246 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
247 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
248 obj(_obj),
249 lock_name(_name), cookie(_cookie)
250{
251}
252
20effc67 253RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(rgw::sal::RadosStore* _store,
7c673cae
FG
254 const rgw_raw_obj& _obj,
255 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
256 store(_store),
257 entries(_entries),
258 obj(_obj), cn(NULL)
259{
260 stringstream& s = set_description();
261 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
262 for (auto i = entries.begin(); i != entries.end(); ++i) {
263 if (i != entries.begin()) {
264 s << ", ";
265 }
266 s << i->first;
267 }
268 s << "]";
269}
270
b3b6e05e 271int RGWRadosSetOmapKeysCR::send_request(const DoutPrefixProvider *dpp)
7c673cae 272{
b3b6e05e 273 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
7c673cae 274 if (r < 0) {
b3b6e05e 275 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
7c673cae
FG
276 return r;
277 }
278
279 set_status() << "sending request";
280
281 librados::ObjectWriteOperation op;
282 op.omap_set(entries);
283
284 cn = stack->create_completion_notifier();
9f95a23c 285 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
7c673cae
FG
286}
287
288int RGWRadosSetOmapKeysCR::request_complete()
289{
290 int r = cn->completion()->get_return_value();
291
292 set_status() << "request complete; ret=" << r;
293
294 return r;
295}
296
20effc67 297RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(rgw::sal::RadosStore* _store,
7c673cae
FG
298 const rgw_raw_obj& _obj,
299 const string& _marker,
11fdf7f2
TL
300 int _max_entries,
301 ResultPtr _result)
302 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
303 marker(_marker), max_entries(_max_entries),
304 result(std::move(_result))
7c673cae 305{
11fdf7f2
TL
306 ceph_assert(result); // must be allocated
307 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
7c673cae
FG
308}
309
b3b6e05e
TL
310int RGWRadosGetOmapKeysCR::send_request(const DoutPrefixProvider *dpp) {
311 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &result->ref);
7c673cae 312 if (r < 0) {
b3b6e05e 313 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
7c673cae
FG
314 return r;
315 }
316
317 set_status() << "send request";
318
319 librados::ObjectReadOperation op;
11fdf7f2 320 op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
7c673cae 321
11fdf7f2 322 cn = stack->create_completion_notifier(result);
9f95a23c 323 return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
7c673cae
FG
324}
325
28e407b8
AA
326int RGWRadosGetOmapKeysCR::request_complete()
327{
328 int r = cn->completion()->get_return_value();
329
330 set_status() << "request complete; ret=" << r;
331
332 return r;
333}
334
20effc67 335RGWRadosGetOmapValsCR::RGWRadosGetOmapValsCR(rgw::sal::RadosStore* _store,
f67539c2
TL
336 const rgw_raw_obj& _obj,
337 const string& _marker,
338 int _max_entries,
339 ResultPtr _result)
340 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
341 marker(_marker), max_entries(_max_entries),
342 result(std::move(_result))
343{
344 ceph_assert(result); // must be allocated
345 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
346}
347
b3b6e05e
TL
348int RGWRadosGetOmapValsCR::send_request(const DoutPrefixProvider *dpp) {
349 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &result->ref);
f67539c2 350 if (r < 0) {
b3b6e05e 351 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
f67539c2
TL
352 return r;
353 }
354
355 set_status() << "send request";
356
357 librados::ObjectReadOperation op;
358 op.omap_get_vals2(marker, max_entries, &result->entries, &result->more, nullptr);
359
360 cn = stack->create_completion_notifier(result);
361 return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
362}
363
364int RGWRadosGetOmapValsCR::request_complete()
365{
366 int r = cn->completion()->get_return_value();
367
368 set_status() << "request complete; ret=" << r;
369
370 return r;
371}
372
20effc67 373RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore* _store,
7c673cae
FG
374 const rgw_raw_obj& _obj,
375 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
376 store(_store),
377 keys(_keys),
378 obj(_obj), cn(NULL)
379{
380 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
381}
382
b3b6e05e
TL
383int RGWRadosRemoveOmapKeysCR::send_request(const DoutPrefixProvider *dpp) {
384 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
7c673cae 385 if (r < 0) {
b3b6e05e 386 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
7c673cae
FG
387 return r;
388 }
389
390 set_status() << "send request";
391
392 librados::ObjectWriteOperation op;
393 op.omap_rm_keys(keys);
394
395 cn = stack->create_completion_notifier();
9f95a23c 396 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
7c673cae
FG
397}
398
224ce89b
WB
399int RGWRadosRemoveOmapKeysCR::request_complete()
400{
401 int r = cn->completion()->get_return_value();
402
403 set_status() << "request complete; ret=" << r;
404
405 return r;
406}
407
20effc67 408RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
f67539c2
TL
409 RGWObjVersionTracker* objv_tracker)
410 : RGWSimpleCoroutine(store->ctx()),
411 store(store), obj(obj), objv_tracker(objv_tracker)
7c673cae
FG
412{
413 set_description() << "remove dest=" << obj;
414}
415
b3b6e05e 416int RGWRadosRemoveCR::send_request(const DoutPrefixProvider *dpp)
7c673cae 417{
9f95a23c 418 auto rados = store->getRados()->get_rados_handle();
7c673cae
FG
419 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
420 if (r < 0) {
421 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
422 return r;
423 }
424 ioctx.locator_set_key(obj.loc);
425
426 set_status() << "send request";
427
428 librados::ObjectWriteOperation op;
f67539c2
TL
429 if (objv_tracker) {
430 objv_tracker->prepare_op_for_write(&op);
431 }
7c673cae
FG
432 op.remove();
433
434 cn = stack->create_completion_notifier();
435 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
436}
437
438int RGWRadosRemoveCR::request_complete()
439{
440 int r = cn->completion()->get_return_value();
441
442 set_status() << "request complete; ret=" << r;
443
444 return r;
445}
446
20effc67 447RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
7c673cae
FG
448 const rgw_raw_obj& _obj,
449 const string& _lock_name,
450 const string& _cookie,
451 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
452 async_rados(_async_rados),
453 store(_store),
454 lock_name(_lock_name),
455 cookie(_cookie),
456 duration(_duration),
457 obj(_obj),
458 req(NULL)
459{
460 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
461}
462
463void RGWSimpleRadosLockCR::request_cleanup()
464{
465 if (req) {
466 req->finish();
467 req = NULL;
468 }
469}
470
b3b6e05e 471int RGWSimpleRadosLockCR::send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
472{
473 set_status() << "sending request";
474 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
475 store, NULL, obj, lock_name, cookie, duration);
476 async_rados->queue(req);
477 return 0;
478}
479
480int RGWSimpleRadosLockCR::request_complete()
481{
482 set_status() << "request complete; ret=" << req->get_ret_status();
483 return req->get_ret_status();
484}
485
20effc67 486RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
7c673cae
FG
487 const rgw_raw_obj& _obj,
488 const string& _lock_name,
489 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
490 async_rados(_async_rados),
491 store(_store),
492 lock_name(_lock_name),
493 cookie(_cookie),
494 obj(_obj),
495 req(NULL)
496{
497 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
498}
499
500void RGWSimpleRadosUnlockCR::request_cleanup()
501{
502 if (req) {
503 req->finish();
504 req = NULL;
505 }
506}
507
b3b6e05e 508int RGWSimpleRadosUnlockCR::send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
509{
510 set_status() << "sending request";
511
512 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
513 store, NULL, obj, lock_name, cookie);
514 async_rados->queue(req);
515 return 0;
516}
517
518int RGWSimpleRadosUnlockCR::request_complete()
519{
520 set_status() << "request complete; ret=" << req->get_ret_status();
521 return req->get_ret_status();
522}
523
b3b6e05e 524int RGWOmapAppend::operate(const DoutPrefixProvider *dpp) {
7c673cae
FG
525 reenter(this) {
526 for (;;) {
527 if (!has_product() && going_down) {
528 set_status() << "going down";
529 break;
530 }
531 set_status() << "waiting for product";
532 yield wait_for_product();
533 yield {
534 string entry;
535 while (consume(&entry)) {
536 set_status() << "adding entry: " << entry;
537 entries[entry] = bufferlist();
538 if (entries.size() >= window_size) {
539 break;
540 }
541 }
542 if (entries.size() >= window_size || going_down) {
543 set_status() << "flushing to omap";
544 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
545 entries.clear();
546 }
547 }
548 if (get_ret_status() < 0) {
549 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
550 return set_state(RGWCoroutine_Error);
551 }
552 }
553 /* done with coroutine */
554 return set_state(RGWCoroutine_Done);
555 }
556 return 0;
557}
558
559void RGWOmapAppend::flush_pending() {
560 receive(pending_entries);
561 num_pending_entries = 0;
562}
563
564bool RGWOmapAppend::append(const string& s) {
565 if (is_done()) {
566 return false;
567 }
568 ++total_entries;
569 pending_entries.push_back(s);
570 if (++num_pending_entries >= (int)window_size) {
571 flush_pending();
572 }
573 return true;
574}
575
576bool RGWOmapAppend::finish() {
577 going_down = true;
578 flush_pending();
579 set_sleeping(false);
580 return (!is_done());
581}
582
b3b6e05e 583int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp)
7c673cae 584{
9f95a23c
TL
585 int r;
586 if (!bucket.bucket_id.empty()) {
587 RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx();
b3b6e05e 588 r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, &attrs, null_yield, dpp);
9f95a23c 589 } else {
b3b6e05e 590 r = store->ctl()->bucket->read_bucket_info(bucket, &bucket_info, null_yield, dpp,
9f95a23c
TL
591 RGWBucketCtl::BucketInstance::GetParams().set_attrs(&attrs));
592 }
7c673cae 593 if (r < 0) {
b3b6e05e 594 ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info for "
9f95a23c 595 << bucket << dendl;
7c673cae
FG
596 return r;
597 }
598
599 return 0;
600}
601
b3b6e05e 602RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp,
20effc67 603 rgw::sal::RadosStore* store,
b32b8144
FG
604 const RGWBucketInfo& bucket_info,
605 int shard_id,
606 const std::string& start_marker,
607 const std::string& end_marker)
20effc67
TL
608 : RGWSimpleCoroutine(store->ctx()), bucket_info(bucket_info),
609 shard_id(shard_id), bs(store->getRados()),
b32b8144
FG
610 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
611 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
612{
b32b8144
FG
613}
614
b3b6e05e 615int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider *dpp)
b32b8144 616{
20effc67
TL
617 int r = bs.init(dpp, bucket_info, bucket_info.layout.current_index, shard_id);
618 if (r < 0) {
619 ldpp_dout(dpp, -1) << "ERROR: bucket shard init failed ret=" << r << dendl;
620 return r;
621 }
622
b32b8144
FG
623 bufferlist in;
624 cls_rgw_bi_log_trim_op call;
625 call.start_marker = std::move(start_marker);
626 call.end_marker = std::move(end_marker);
11fdf7f2 627 encode(call, in);
b32b8144
FG
628
629 librados::ObjectWriteOperation op;
630 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
631
632 cn = stack->create_completion_notifier();
9f95a23c 633 return bs.bucket_obj.aio_operate(cn->completion(), &op);
b32b8144
FG
634}
635
636int RGWRadosBILogTrimCR::request_complete()
637{
638 int r = cn->completion()->get_return_value();
639 set_status() << "request complete; ret=" << r;
640 return r;
641}
642
b3b6e05e 643int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
644{
645 RGWObjectCtx obj_ctx(store);
646
7c673cae 647 char buf[16];
9f95a23c 648 snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
20effc67 649 rgw::sal::Attrs attrs;
7c673cae 650
20effc67
TL
651 rgw::sal::RadosBucket bucket(store, src_bucket);
652 rgw::sal::RadosObject src_obj(store, key, &bucket);
653 rgw::sal::RadosBucket dest_bucket(store, dest_bucket_info);
654 rgw::sal::RadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket);
7c673cae 655
81eedcae 656 std::optional<uint64_t> bytes_transferred;
9f95a23c
TL
657 int r = store->getRados()->fetch_remote_obj(obj_ctx,
658 user_id.value_or(rgw_user()),
7c673cae
FG
659 NULL, /* req_info */
660 source_zone,
f67539c2
TL
661 &dest_obj,
662 &src_obj,
663 &dest_bucket, /* dest */
9f95a23c 664 nullptr, /* source */
11fdf7f2 665 dest_placement_rule,
7c673cae
FG
666 NULL, /* real_time* src_mtime, */
667 NULL, /* real_time* mtime, */
668 NULL, /* const real_time* mod_ptr, */
669 NULL, /* const real_time* unmod_ptr, */
670 false, /* high precision time */
671 NULL, /* const char *if_match, */
672 NULL, /* const char *if_nomatch, */
673 RGWRados::ATTRSMOD_NONE,
674 copy_if_newer,
675 attrs,
11fdf7f2 676 RGWObjCategory::Main,
7c673cae
FG
677 versioned_epoch,
678 real_time(), /* delete_at */
7c673cae
FG
679 NULL, /* string *ptag, */
680 NULL, /* string *petag, */
7c673cae 681 NULL, /* void (*progress_cb)(off_t, void *), */
31f18b77 682 NULL, /* void *progress_data*); */
9f95a23c
TL
683 dpp,
684 filter.get(),
81eedcae
TL
685 &zones_trace,
686 &bytes_transferred);
7c673cae
FG
687
688 if (r < 0) {
b3b6e05e 689 ldpp_dout(dpp, 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
81eedcae
TL
690 if (counters) {
691 counters->inc(sync_counters::l_fetch_err, 1);
692 }
693 } else if (counters) {
694 if (bytes_transferred) {
695 counters->inc(sync_counters::l_fetch, *bytes_transferred);
696 } else {
697 counters->inc(sync_counters::l_fetch_not_modified);
698 }
7c673cae
FG
699 }
700 return r;
701}
702
b3b6e05e 703int RGWAsyncStatRemoteObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
704{
705 RGWObjectCtx obj_ctx(store);
706
707 string user_id;
708 char buf[16];
9f95a23c 709 snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
7c673cae 710
20effc67
TL
711 rgw::sal::RadosBucket bucket(store, src_bucket);
712 rgw::sal::RadosObject src_obj(store, key, &bucket);
7c673cae 713
b3b6e05e
TL
714 int r = store->getRados()->stat_remote_obj(dpp,
715 obj_ctx,
9f95a23c 716 rgw_user(user_id),
7c673cae
FG
717 nullptr, /* req_info */
718 source_zone,
f67539c2 719 &src_obj,
9f95a23c 720 nullptr, /* source */
7c673cae
FG
721 pmtime, /* real_time* src_mtime, */
722 psize, /* uint64_t * */
723 nullptr, /* const real_time* mod_ptr, */
724 nullptr, /* const real_time* unmod_ptr, */
725 true, /* high precision time */
726 nullptr, /* const char *if_match, */
727 nullptr, /* const char *if_nomatch, */
728 pattrs,
11fdf7f2 729 pheaders,
7c673cae
FG
730 nullptr,
731 nullptr, /* string *ptag, */
11fdf7f2 732 petag); /* string *petag, */
7c673cae
FG
733
734 if (r < 0) {
20effc67 735 ldpp_dout(dpp, 0) << "store->stat_remote_obj() returned r=" << r << dendl;
7c673cae
FG
736 }
737 return r;
738}
739
740
b3b6e05e 741int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
742{
743 RGWObjectCtx obj_ctx(store);
744
745 rgw_obj obj(bucket_info.bucket, key);
746
b3b6e05e 747 ldpp_dout(dpp, 0) << __func__ << "(): deleting obj=" << obj << dendl;
7c673cae 748
11fdf7f2 749 obj_ctx.set_atomic(obj);
7c673cae
FG
750
751 RGWObjState *state;
752
b3b6e05e 753 int ret = store->getRados()->get_obj_state(dpp, &obj_ctx, bucket_info, obj, &state, null_yield);
7c673cae 754 if (ret < 0) {
b3b6e05e 755 ldpp_dout(dpp, 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
7c673cae
FG
756 return ret;
757 }
758
759 /* has there been any racing object write? */
760 if (del_if_older && (state->mtime > timestamp)) {
b3b6e05e 761 ldpp_dout(dpp, 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
7c673cae
FG
762 return 0;
763 }
764
765 RGWAccessControlPolicy policy;
766
767 /* decode policy */
768 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
769 if (iter != state->attrset.end()) {
11fdf7f2 770 auto bliter = iter->second.cbegin();
7c673cae
FG
771 try {
772 policy.decode(bliter);
773 } catch (buffer::error& err) {
b3b6e05e 774 ldpp_dout(dpp, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
7c673cae
FG
775 return -EIO;
776 }
777 }
778
9f95a23c 779 RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj);
7c673cae
FG
780 RGWRados::Object::Delete del_op(&del_target);
781
782 del_op.params.bucket_owner = bucket_info.owner;
783 del_op.params.obj_owner = policy.get_owner();
784 if (del_if_older) {
785 del_op.params.unmod_since = timestamp;
786 }
787 if (versioned) {
788 del_op.params.versioning_status = BUCKET_VERSIONED;
789 }
790 del_op.params.olh_epoch = versioned_epoch;
791 del_op.params.marker_version_id = marker_version_id;
9f95a23c 792 del_op.params.obj_owner.set_id(rgw_user(owner));
7c673cae
FG
793 del_op.params.obj_owner.set_name(owner_display_name);
794 del_op.params.mtime = timestamp;
795 del_op.params.high_precision_time = true;
91327a77 796 del_op.params.zones_trace = &zones_trace;
7c673cae 797
b3b6e05e 798 ret = del_op.delete_obj(null_yield, dpp);
7c673cae 799 if (ret < 0) {
b3b6e05e 800 ldpp_dout(dpp, 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
7c673cae
FG
801 }
802 return ret;
803}
804
b3b6e05e 805int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
7c673cae
FG
806{
807 if (aborted) {
808 caller->set_sleeping(false);
809 return set_cr_done();
810 }
811 reenter(this) {
812 while (!going_down) {
813 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
814
815 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
816 if (retcode < 0) {
817 set_locked(false);
818 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
819 return set_state(RGWCoroutine_Error, retcode);
820 }
821 set_locked(true);
822 yield wait(utime_t(interval / 2, 0));
823 }
824 set_locked(false); /* moot at this point anyway */
825 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
826 return set_state(RGWCoroutine_Done);
827 }
828 return 0;
829}
830
20effc67 831RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const string& _oid,
7c673cae 832 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
b3b6e05e 833 dpp(_dpp),
7c673cae
FG
834 store(_store),
835 oid(_oid), cn(NULL)
836{
837 stringstream& s = set_description();
838 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
839 entries.push_back(entry);
840}
841
b3b6e05e 842int RGWRadosTimelogAddCR::send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
843{
844 set_status() << "sending request";
845
846 cn = stack->create_completion_notifier();
b3b6e05e 847 return store->svc()->cls->timelog.add(dpp, oid, entries, cn->completion(), true, null_yield);
7c673cae
FG
848}
849
850int RGWRadosTimelogAddCR::request_complete()
851{
852 int r = cn->completion()->get_return_value();
853
854 set_status() << "request complete; ret=" << r;
855
856 return r;
857}
858
20effc67
TL
859RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(const DoutPrefixProvider *dpp,
860 rgw::sal::RadosStore* store,
7c673cae
FG
861 const std::string& oid,
862 const real_time& start_time,
863 const real_time& end_time,
864 const std::string& from_marker,
865 const std::string& to_marker)
b3b6e05e 866 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), oid(oid),
7c673cae
FG
867 start_time(start_time), end_time(end_time),
868 from_marker(from_marker), to_marker(to_marker)
869{
870 set_description() << "timelog trim oid=" << oid
871 << " start_time=" << start_time << " end_time=" << end_time
872 << " from_marker=" << from_marker << " to_marker=" << to_marker;
873}
874
b3b6e05e 875int RGWRadosTimelogTrimCR::send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
876{
877 set_status() << "sending request";
878
879 cn = stack->create_completion_notifier();
b3b6e05e 880 return store->svc()->cls->timelog.trim(dpp, oid, start_time, end_time, from_marker,
9f95a23c
TL
881 to_marker, cn->completion(),
882 null_yield);
7c673cae
FG
883}
884
885int RGWRadosTimelogTrimCR::request_complete()
886{
887 int r = cn->completion()->get_return_value();
888
889 set_status() << "request complete; ret=" << r;
890
891 return r;
892}
893
894
20effc67
TL
895RGWSyncLogTrimCR::RGWSyncLogTrimCR(const DoutPrefixProvider *dpp,
896 rgw::sal::RadosStore* store, const std::string& oid,
7c673cae
FG
897 const std::string& to_marker,
898 std::string *last_trim_marker)
b3b6e05e 899 : RGWRadosTimelogTrimCR(dpp, store, oid, real_time{}, real_time{},
7c673cae
FG
900 std::string{}, to_marker),
901 cct(store->ctx()), last_trim_marker(last_trim_marker)
902{
903}
904
905int RGWSyncLogTrimCR::request_complete()
906{
907 int r = RGWRadosTimelogTrimCR::request_complete();
a8e16298 908 if (r != -ENODATA) {
7c673cae
FG
909 return r;
910 }
a8e16298 911 // nothing left to trim, update last_trim_marker
eafe8130 912 if (*last_trim_marker < to_marker && to_marker != max_marker) {
7c673cae
FG
913 *last_trim_marker = to_marker;
914 }
915 return 0;
916}
917
918
b3b6e05e 919int RGWAsyncStatObj::_send_request(const DoutPrefixProvider *dpp)
7c673cae
FG
920{
921 rgw_raw_obj raw_obj;
9f95a23c 922 store->getRados()->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
b3b6e05e 923 return store->getRados()->raw_obj_stat(dpp, raw_obj, psize, pmtime, pepoch,
9f95a23c 924 nullptr, nullptr, objv_tracker, null_yield);
7c673cae
FG
925}
926
20effc67
TL
927RGWStatObjCR::RGWStatObjCR(const DoutPrefixProvider *dpp,
928 RGWAsyncRadosProcessor *async_rados, rgw::sal::RadosStore* store,
7c673cae
FG
929 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
930 real_time* pmtime, uint64_t *pepoch,
931 RGWObjVersionTracker *objv_tracker)
b3b6e05e 932 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), async_rados(async_rados),
7c673cae
FG
933 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
934 objv_tracker(objv_tracker)
935{
936}
937
938void RGWStatObjCR::request_cleanup()
939{
940 if (req) {
941 req->finish();
942 req = NULL;
943 }
944}
945
b3b6e05e 946int RGWStatObjCR::send_request(const DoutPrefixProvider *dpp)
7c673cae 947{
b3b6e05e 948 req = new RGWAsyncStatObj(dpp, this, stack->create_completion_notifier(),
7c673cae
FG
949 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
950 async_rados->queue(req);
951 return 0;
952}
953
954int RGWStatObjCR::request_complete()
955{
956 return req->get_ret_status();
957}
b32b8144 958
20effc67 959RGWRadosNotifyCR::RGWRadosNotifyCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
b32b8144
FG
960 bufferlist& request, uint64_t timeout_ms,
961 bufferlist *response)
962 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
963 request(request), timeout_ms(timeout_ms), response(response)
964{
965 set_description() << "notify dest=" << obj;
966}
967
b3b6e05e 968int RGWRadosNotifyCR::send_request(const DoutPrefixProvider *dpp)
b32b8144 969{
b3b6e05e 970 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
b32b8144 971 if (r < 0) {
b3b6e05e 972 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
b32b8144
FG
973 return r;
974 }
975
976 set_status() << "sending request";
977
978 cn = stack->create_completion_notifier();
9f95a23c 979 return ref.pool.ioctx().aio_notify(ref.obj.oid, cn->completion(), request,
b32b8144
FG
980 timeout_ms, response);
981}
982
983int RGWRadosNotifyCR::request_complete()
984{
985 int r = cn->completion()->get_return_value();
986
987 set_status() << "request complete; ret=" << r;
988
989 return r;
990}