]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/compat.h"
5 #include "rgw_sal.h"
6 #include "rgw_zone.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_sync_counters.h"
10 #include "rgw_bucket.h"
11
12 #include "services/svc_zone.h"
13 #include "services/svc_zone_utils.h"
14 #include "services/svc_sys_obj.h"
15 #include "services/svc_cls.h"
16
17 #include "cls/lock/cls_lock_client.h"
18 #include "cls/rgw/cls_rgw_client.h"
19
20 #include <boost/asio/yield.hpp>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
26 if (processor->is_going_down()) {
27 return false;
28 }
29 req->get();
30 processor->m_req_queue.push_back(req);
31 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
32 _dump_queue();
33 return true;
34 }
35
36 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
37 return processor->m_req_queue.empty();
38 }
39
40 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
41 if (processor->m_req_queue.empty())
42 return NULL;
43 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
44 processor->m_req_queue.pop_front();
45 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
46 _dump_queue();
47 return req;
48 }
49
50 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
51 processor->handle_request(this, req);
52 processor->req_throttle.put(1);
53 }
54
55 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
56 if (!g_conf()->subsys.should_gather<ceph_subsys_rgw, 20>()) {
57 return;
58 }
59 deque<RGWAsyncRadosRequest *>::iterator iter;
60 if (processor->m_req_queue.empty()) {
61 dout(20) << "RGWWQ: empty" << dendl;
62 return;
63 }
64 dout(20) << "RGWWQ:" << dendl;
65 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
66 dout(20) << "req: " << hex << *iter << dec << dendl;
67 }
68 }
69
70 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads)
71 : cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
72 req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2),
73 req_wq(this,
74 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
75 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
76 &m_tp) {
77 }
78
79 void RGWAsyncRadosProcessor::start() {
80 m_tp.start();
81 }
82
83 void RGWAsyncRadosProcessor::stop() {
84 going_down = true;
85 m_tp.drain(&req_wq);
86 m_tp.stop();
87 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
88 (*iter)->put();
89 }
90 }
91
92 void RGWAsyncRadosProcessor::handle_request(const DoutPrefixProvider *dpp, RGWAsyncRadosRequest *req) {
93 req->send_request(dpp);
94 req->put();
95 }
96
97 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
98 req_throttle.get(1);
99 req_wq.queue(req);
100 }
101
102 int RGWAsyncGetSystemObj::_send_request(const DoutPrefixProvider *dpp)
103 {
104 map<string, bufferlist> *pattrs = want_attrs ? &attrs : nullptr;
105
106 auto sysobj = obj_ctx.get_obj(obj);
107 return sysobj.rop()
108 .set_objv_tracker(&objv_tracker)
109 .set_attrs(pattrs)
110 .set_raw_attrs(raw_attrs)
111 .read(dpp, &bl, null_yield);
112 }
113
114 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
115 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
116 bool want_attrs, bool raw_attrs)
117 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), obj_ctx(_svc),
118 obj(_obj), want_attrs(want_attrs), raw_attrs(raw_attrs)
119 {
120 if (_objv_tracker) {
121 objv_tracker = *_objv_tracker;
122 }
123 }
124
125 int RGWSimpleRadosReadAttrsCR::send_request(const DoutPrefixProvider *dpp)
126 {
127 req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(),
128 svc, objv_tracker, obj, true, raw_attrs);
129 async_rados->queue(req);
130 return 0;
131 }
132
133 int RGWSimpleRadosReadAttrsCR::request_complete()
134 {
135 if (pattrs) {
136 *pattrs = std::move(req->attrs);
137 }
138 if (objv_tracker) {
139 *objv_tracker = req->objv_tracker;
140 }
141 return req->get_ret_status();
142 }
143
144 int RGWAsyncPutSystemObj::_send_request(const DoutPrefixProvider *dpp)
145 {
146 auto obj_ctx = svc->init_obj_ctx();
147 auto sysobj = obj_ctx.get_obj(obj);
148 return sysobj.wop()
149 .set_objv_tracker(&objv_tracker)
150 .set_exclusive(exclusive)
151 .write_data(dpp, bl, null_yield);
152 }
153
154 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(const DoutPrefixProvider *_dpp,
155 RGWCoroutine *caller,
156 RGWAioCompletionNotifier *cn,
157 RGWSI_SysObj *_svc,
158 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
159 bool _exclusive, bufferlist _bl)
160 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc(_svc),
161 obj(_obj), exclusive(_exclusive), bl(std::move(_bl))
162 {
163 if (_objv_tracker) {
164 objv_tracker = *_objv_tracker;
165 }
166 }
167
168 int RGWAsyncPutSystemObjAttrs::_send_request(const DoutPrefixProvider *dpp)
169 {
170 auto obj_ctx = svc->init_obj_ctx();
171 auto sysobj = obj_ctx.get_obj(obj);
172 return sysobj.wop()
173 .set_objv_tracker(&objv_tracker)
174 .set_exclusive(false)
175 .set_attrs(attrs)
176 .write_attrs(dpp, null_yield);
177 }
178
179 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
180 RGWSI_SysObj *_svc,
181 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
182 map<string, bufferlist> _attrs)
183 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc(_svc),
184 obj(_obj), attrs(std::move(_attrs))
185 {
186 if (_objv_tracker) {
187 objv_tracker = *_objv_tracker;
188 }
189 }
190
191
192 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store, const rgw_raw_obj& _obj,
193 uint64_t _window_size)
194 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
195 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
196 {
197 }
198
199 int RGWAsyncLockSystemObj::_send_request(const DoutPrefixProvider *dpp)
200 {
201 rgw_rados_ref ref;
202 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
203 if (r < 0) {
204 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
205 return r;
206 }
207
208 rados::cls::lock::Lock l(lock_name);
209 utime_t duration(duration_secs, 0);
210 l.set_duration(duration);
211 l.set_cookie(cookie);
212 l.set_may_renew(true);
213
214 return l.lock_exclusive(&ref.pool.ioctx(), ref.obj.oid);
215 }
216
217 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
218 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
219 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
220 obj(_obj),
221 lock_name(_name),
222 cookie(_cookie),
223 duration_secs(_duration_secs)
224 {
225 }
226
227 int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider *dpp)
228 {
229 rgw_rados_ref ref;
230 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
231 if (r < 0) {
232 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
233 return r;
234 }
235
236 rados::cls::lock::Lock l(lock_name);
237
238 l.set_cookie(cookie);
239
240 return l.unlock(&ref.pool.ioctx(), ref.obj.oid);
241 }
242
243 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
244 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
245 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
246 obj(_obj),
247 lock_name(_name), cookie(_cookie)
248 {
249 }
250
251 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(rgw::sal::RGWRadosStore *_store,
252 const rgw_raw_obj& _obj,
253 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
254 store(_store),
255 entries(_entries),
256 obj(_obj), cn(NULL)
257 {
258 stringstream& s = set_description();
259 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
260 for (auto i = entries.begin(); i != entries.end(); ++i) {
261 if (i != entries.begin()) {
262 s << ", ";
263 }
264 s << i->first;
265 }
266 s << "]";
267 }
268
269 int RGWRadosSetOmapKeysCR::send_request(const DoutPrefixProvider *dpp)
270 {
271 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
272 if (r < 0) {
273 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
274 return r;
275 }
276
277 set_status() << "sending request";
278
279 librados::ObjectWriteOperation op;
280 op.omap_set(entries);
281
282 cn = stack->create_completion_notifier();
283 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
284 }
285
286 int RGWRadosSetOmapKeysCR::request_complete()
287 {
288 int r = cn->completion()->get_return_value();
289
290 set_status() << "request complete; ret=" << r;
291
292 return r;
293 }
294
295 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(rgw::sal::RGWRadosStore *_store,
296 const rgw_raw_obj& _obj,
297 const string& _marker,
298 int _max_entries,
299 ResultPtr _result)
300 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
301 marker(_marker), max_entries(_max_entries),
302 result(std::move(_result))
303 {
304 ceph_assert(result); // must be allocated
305 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
306 }
307
308 int RGWRadosGetOmapKeysCR::send_request(const DoutPrefixProvider *dpp) {
309 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &result->ref);
310 if (r < 0) {
311 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
312 return r;
313 }
314
315 set_status() << "send request";
316
317 librados::ObjectReadOperation op;
318 op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
319
320 cn = stack->create_completion_notifier(result);
321 return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
322 }
323
324 int RGWRadosGetOmapKeysCR::request_complete()
325 {
326 int r = cn->completion()->get_return_value();
327
328 set_status() << "request complete; ret=" << r;
329
330 return r;
331 }
332
333 RGWRadosGetOmapValsCR::RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore *_store,
334 const rgw_raw_obj& _obj,
335 const string& _marker,
336 int _max_entries,
337 ResultPtr _result)
338 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
339 marker(_marker), max_entries(_max_entries),
340 result(std::move(_result))
341 {
342 ceph_assert(result); // must be allocated
343 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
344 }
345
346 int RGWRadosGetOmapValsCR::send_request(const DoutPrefixProvider *dpp) {
347 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &result->ref);
348 if (r < 0) {
349 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
350 return r;
351 }
352
353 set_status() << "send request";
354
355 librados::ObjectReadOperation op;
356 op.omap_get_vals2(marker, max_entries, &result->entries, &result->more, nullptr);
357
358 cn = stack->create_completion_notifier(result);
359 return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
360 }
361
362 int RGWRadosGetOmapValsCR::request_complete()
363 {
364 int r = cn->completion()->get_return_value();
365
366 set_status() << "request complete; ret=" << r;
367
368 return r;
369 }
370
371 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore *_store,
372 const rgw_raw_obj& _obj,
373 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
374 store(_store),
375 keys(_keys),
376 obj(_obj), cn(NULL)
377 {
378 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
379 }
380
381 int RGWRadosRemoveOmapKeysCR::send_request(const DoutPrefixProvider *dpp) {
382 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
383 if (r < 0) {
384 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
385 return r;
386 }
387
388 set_status() << "send request";
389
390 librados::ObjectWriteOperation op;
391 op.omap_rm_keys(keys);
392
393 cn = stack->create_completion_notifier();
394 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
395 }
396
397 int RGWRadosRemoveOmapKeysCR::request_complete()
398 {
399 int r = cn->completion()->get_return_value();
400
401 set_status() << "request complete; ret=" << r;
402
403 return r;
404 }
405
406 RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
407 RGWObjVersionTracker* objv_tracker)
408 : RGWSimpleCoroutine(store->ctx()),
409 store(store), obj(obj), objv_tracker(objv_tracker)
410 {
411 set_description() << "remove dest=" << obj;
412 }
413
414 int RGWRadosRemoveCR::send_request(const DoutPrefixProvider *dpp)
415 {
416 auto rados = store->getRados()->get_rados_handle();
417 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
418 if (r < 0) {
419 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
420 return r;
421 }
422 ioctx.locator_set_key(obj.loc);
423
424 set_status() << "send request";
425
426 librados::ObjectWriteOperation op;
427 if (objv_tracker) {
428 objv_tracker->prepare_op_for_write(&op);
429 }
430 op.remove();
431
432 cn = stack->create_completion_notifier();
433 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
434 }
435
436 int RGWRadosRemoveCR::request_complete()
437 {
438 int r = cn->completion()->get_return_value();
439
440 set_status() << "request complete; ret=" << r;
441
442 return r;
443 }
444
445 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
446 const rgw_raw_obj& _obj,
447 const string& _lock_name,
448 const string& _cookie,
449 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
450 async_rados(_async_rados),
451 store(_store),
452 lock_name(_lock_name),
453 cookie(_cookie),
454 duration(_duration),
455 obj(_obj),
456 req(NULL)
457 {
458 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
459 }
460
461 void RGWSimpleRadosLockCR::request_cleanup()
462 {
463 if (req) {
464 req->finish();
465 req = NULL;
466 }
467 }
468
469 int RGWSimpleRadosLockCR::send_request(const DoutPrefixProvider *dpp)
470 {
471 set_status() << "sending request";
472 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
473 store, NULL, obj, lock_name, cookie, duration);
474 async_rados->queue(req);
475 return 0;
476 }
477
478 int RGWSimpleRadosLockCR::request_complete()
479 {
480 set_status() << "request complete; ret=" << req->get_ret_status();
481 return req->get_ret_status();
482 }
483
484 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
485 const rgw_raw_obj& _obj,
486 const string& _lock_name,
487 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
488 async_rados(_async_rados),
489 store(_store),
490 lock_name(_lock_name),
491 cookie(_cookie),
492 obj(_obj),
493 req(NULL)
494 {
495 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
496 }
497
498 void RGWSimpleRadosUnlockCR::request_cleanup()
499 {
500 if (req) {
501 req->finish();
502 req = NULL;
503 }
504 }
505
506 int RGWSimpleRadosUnlockCR::send_request(const DoutPrefixProvider *dpp)
507 {
508 set_status() << "sending request";
509
510 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
511 store, NULL, obj, lock_name, cookie);
512 async_rados->queue(req);
513 return 0;
514 }
515
516 int RGWSimpleRadosUnlockCR::request_complete()
517 {
518 set_status() << "request complete; ret=" << req->get_ret_status();
519 return req->get_ret_status();
520 }
521
522 int RGWOmapAppend::operate(const DoutPrefixProvider *dpp) {
523 reenter(this) {
524 for (;;) {
525 if (!has_product() && going_down) {
526 set_status() << "going down";
527 break;
528 }
529 set_status() << "waiting for product";
530 yield wait_for_product();
531 yield {
532 string entry;
533 while (consume(&entry)) {
534 set_status() << "adding entry: " << entry;
535 entries[entry] = bufferlist();
536 if (entries.size() >= window_size) {
537 break;
538 }
539 }
540 if (entries.size() >= window_size || going_down) {
541 set_status() << "flushing to omap";
542 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
543 entries.clear();
544 }
545 }
546 if (get_ret_status() < 0) {
547 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
548 return set_state(RGWCoroutine_Error);
549 }
550 }
551 /* done with coroutine */
552 return set_state(RGWCoroutine_Done);
553 }
554 return 0;
555 }
556
557 void RGWOmapAppend::flush_pending() {
558 receive(pending_entries);
559 num_pending_entries = 0;
560 }
561
562 bool RGWOmapAppend::append(const string& s) {
563 if (is_done()) {
564 return false;
565 }
566 ++total_entries;
567 pending_entries.push_back(s);
568 if (++num_pending_entries >= (int)window_size) {
569 flush_pending();
570 }
571 return true;
572 }
573
574 bool RGWOmapAppend::finish() {
575 going_down = true;
576 flush_pending();
577 set_sleeping(false);
578 return (!is_done());
579 }
580
581 int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp)
582 {
583 int r;
584 if (!bucket.bucket_id.empty()) {
585 RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx();
586 r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, &attrs, null_yield, dpp);
587 } else {
588 r = store->ctl()->bucket->read_bucket_info(bucket, &bucket_info, null_yield, dpp,
589 RGWBucketCtl::BucketInstance::GetParams().set_attrs(&attrs));
590 }
591 if (r < 0) {
592 ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info for "
593 << bucket << dendl;
594 return r;
595 }
596
597 return 0;
598 }
599
600 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp,
601 rgw::sal::RGWRadosStore *store,
602 const RGWBucketInfo& bucket_info,
603 int shard_id,
604 const std::string& start_marker,
605 const std::string& end_marker)
606 : RGWSimpleCoroutine(store->ctx()), bs(store->getRados()),
607 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
608 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
609 {
610 bs.init(dpp, bucket_info, bucket_info.layout.current_index, shard_id);
611 }
612
613 int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider *dpp)
614 {
615 bufferlist in;
616 cls_rgw_bi_log_trim_op call;
617 call.start_marker = std::move(start_marker);
618 call.end_marker = std::move(end_marker);
619 encode(call, in);
620
621 librados::ObjectWriteOperation op;
622 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
623
624 cn = stack->create_completion_notifier();
625 return bs.bucket_obj.aio_operate(cn->completion(), &op);
626 }
627
628 int RGWRadosBILogTrimCR::request_complete()
629 {
630 int r = cn->completion()->get_return_value();
631 set_status() << "request complete; ret=" << r;
632 return r;
633 }
634
635 int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
636 {
637 RGWObjectCtx obj_ctx(store);
638
639 char buf[16];
640 snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
641 rgw::sal::RGWAttrs attrs;
642
643 rgw::sal::RGWRadosBucket bucket(store, src_bucket);
644 rgw::sal::RGWRadosObject src_obj(store, key, &bucket);
645 rgw::sal::RGWRadosBucket dest_bucket(store, dest_bucket_info);
646 rgw::sal::RGWRadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket);
647
648 std::optional<uint64_t> bytes_transferred;
649 int r = store->getRados()->fetch_remote_obj(obj_ctx,
650 user_id.value_or(rgw_user()),
651 NULL, /* req_info */
652 source_zone,
653 &dest_obj,
654 &src_obj,
655 &dest_bucket, /* dest */
656 nullptr, /* source */
657 dest_placement_rule,
658 NULL, /* real_time* src_mtime, */
659 NULL, /* real_time* mtime, */
660 NULL, /* const real_time* mod_ptr, */
661 NULL, /* const real_time* unmod_ptr, */
662 false, /* high precision time */
663 NULL, /* const char *if_match, */
664 NULL, /* const char *if_nomatch, */
665 RGWRados::ATTRSMOD_NONE,
666 copy_if_newer,
667 attrs,
668 RGWObjCategory::Main,
669 versioned_epoch,
670 real_time(), /* delete_at */
671 NULL, /* string *ptag, */
672 NULL, /* string *petag, */
673 NULL, /* void (*progress_cb)(off_t, void *), */
674 NULL, /* void *progress_data*); */
675 dpp,
676 filter.get(),
677 &zones_trace,
678 &bytes_transferred);
679
680 if (r < 0) {
681 ldpp_dout(dpp, 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
682 if (counters) {
683 counters->inc(sync_counters::l_fetch_err, 1);
684 }
685 } else if (counters) {
686 if (bytes_transferred) {
687 counters->inc(sync_counters::l_fetch, *bytes_transferred);
688 } else {
689 counters->inc(sync_counters::l_fetch_not_modified);
690 }
691 }
692 return r;
693 }
694
695 int RGWAsyncStatRemoteObj::_send_request(const DoutPrefixProvider *dpp)
696 {
697 RGWObjectCtx obj_ctx(store);
698
699 string user_id;
700 char buf[16];
701 snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
702
703 rgw::sal::RGWRadosBucket bucket(store, src_bucket);
704 rgw::sal::RGWRadosObject src_obj(store, key, &bucket);
705
706 int r = store->getRados()->stat_remote_obj(dpp,
707 obj_ctx,
708 rgw_user(user_id),
709 nullptr, /* req_info */
710 source_zone,
711 &src_obj,
712 nullptr, /* source */
713 pmtime, /* real_time* src_mtime, */
714 psize, /* uint64_t * */
715 nullptr, /* const real_time* mod_ptr, */
716 nullptr, /* const real_time* unmod_ptr, */
717 true, /* high precision time */
718 nullptr, /* const char *if_match, */
719 nullptr, /* const char *if_nomatch, */
720 pattrs,
721 pheaders,
722 nullptr,
723 nullptr, /* string *ptag, */
724 petag); /* string *petag, */
725
726 if (r < 0) {
727 ldpp_dout(dpp, 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
728 }
729 return r;
730 }
731
732
733 int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider *dpp)
734 {
735 RGWObjectCtx obj_ctx(store);
736
737 rgw_obj obj(bucket_info.bucket, key);
738
739 ldpp_dout(dpp, 0) << __func__ << "(): deleting obj=" << obj << dendl;
740
741 obj_ctx.set_atomic(obj);
742
743 RGWObjState *state;
744
745 int ret = store->getRados()->get_obj_state(dpp, &obj_ctx, bucket_info, obj, &state, null_yield);
746 if (ret < 0) {
747 ldpp_dout(dpp, 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
748 return ret;
749 }
750
751 /* has there been any racing object write? */
752 if (del_if_older && (state->mtime > timestamp)) {
753 ldpp_dout(dpp, 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
754 return 0;
755 }
756
757 RGWAccessControlPolicy policy;
758
759 /* decode policy */
760 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
761 if (iter != state->attrset.end()) {
762 auto bliter = iter->second.cbegin();
763 try {
764 policy.decode(bliter);
765 } catch (buffer::error& err) {
766 ldpp_dout(dpp, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
767 return -EIO;
768 }
769 }
770
771 RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj);
772 RGWRados::Object::Delete del_op(&del_target);
773
774 del_op.params.bucket_owner = bucket_info.owner;
775 del_op.params.obj_owner = policy.get_owner();
776 if (del_if_older) {
777 del_op.params.unmod_since = timestamp;
778 }
779 if (versioned) {
780 del_op.params.versioning_status = BUCKET_VERSIONED;
781 }
782 del_op.params.olh_epoch = versioned_epoch;
783 del_op.params.marker_version_id = marker_version_id;
784 del_op.params.obj_owner.set_id(rgw_user(owner));
785 del_op.params.obj_owner.set_name(owner_display_name);
786 del_op.params.mtime = timestamp;
787 del_op.params.high_precision_time = true;
788 del_op.params.zones_trace = &zones_trace;
789
790 ret = del_op.delete_obj(null_yield, dpp);
791 if (ret < 0) {
792 ldpp_dout(dpp, 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
793 }
794 return ret;
795 }
796
797 int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
798 {
799 if (aborted) {
800 caller->set_sleeping(false);
801 return set_cr_done();
802 }
803 reenter(this) {
804 while (!going_down) {
805 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
806
807 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
808 if (retcode < 0) {
809 set_locked(false);
810 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
811 return set_state(RGWCoroutine_Error, retcode);
812 }
813 set_locked(true);
814 yield wait(utime_t(interval / 2, 0));
815 }
816 set_locked(false); /* moot at this point anyway */
817 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
818 return set_state(RGWCoroutine_Done);
819 }
820 return 0;
821 }
822
823 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore *_store, const string& _oid,
824 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
825 dpp(_dpp),
826 store(_store),
827 oid(_oid), cn(NULL)
828 {
829 stringstream& s = set_description();
830 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
831 entries.push_back(entry);
832 }
833
834 int RGWRadosTimelogAddCR::send_request(const DoutPrefixProvider *dpp)
835 {
836 set_status() << "sending request";
837
838 cn = stack->create_completion_notifier();
839 return store->svc()->cls->timelog.add(dpp, oid, entries, cn->completion(), true, null_yield);
840 }
841
842 int RGWRadosTimelogAddCR::request_complete()
843 {
844 int r = cn->completion()->get_return_value();
845
846 set_status() << "request complete; ret=" << r;
847
848 return r;
849 }
850
851 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(const DoutPrefixProvider *dpp,
852 rgw::sal::RGWRadosStore *store,
853 const std::string& oid,
854 const real_time& start_time,
855 const real_time& end_time,
856 const std::string& from_marker,
857 const std::string& to_marker)
858 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), oid(oid),
859 start_time(start_time), end_time(end_time),
860 from_marker(from_marker), to_marker(to_marker)
861 {
862 set_description() << "timelog trim oid=" << oid
863 << " start_time=" << start_time << " end_time=" << end_time
864 << " from_marker=" << from_marker << " to_marker=" << to_marker;
865 }
866
867 int RGWRadosTimelogTrimCR::send_request(const DoutPrefixProvider *dpp)
868 {
869 set_status() << "sending request";
870
871 cn = stack->create_completion_notifier();
872 return store->svc()->cls->timelog.trim(dpp, oid, start_time, end_time, from_marker,
873 to_marker, cn->completion(),
874 null_yield);
875 }
876
877 int RGWRadosTimelogTrimCR::request_complete()
878 {
879 int r = cn->completion()->get_return_value();
880
881 set_status() << "request complete; ret=" << r;
882
883 return r;
884 }
885
886
887 RGWSyncLogTrimCR::RGWSyncLogTrimCR(const DoutPrefixProvider *dpp,
888 rgw::sal::RGWRadosStore *store, const std::string& oid,
889 const std::string& to_marker,
890 std::string *last_trim_marker)
891 : RGWRadosTimelogTrimCR(dpp, store, oid, real_time{}, real_time{},
892 std::string{}, to_marker),
893 cct(store->ctx()), last_trim_marker(last_trim_marker)
894 {
895 }
896
897 int RGWSyncLogTrimCR::request_complete()
898 {
899 int r = RGWRadosTimelogTrimCR::request_complete();
900 if (r != -ENODATA) {
901 return r;
902 }
903 // nothing left to trim, update last_trim_marker
904 if (*last_trim_marker < to_marker && to_marker != max_marker) {
905 *last_trim_marker = to_marker;
906 }
907 return 0;
908 }
909
910
911 int RGWAsyncStatObj::_send_request(const DoutPrefixProvider *dpp)
912 {
913 rgw_raw_obj raw_obj;
914 store->getRados()->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
915 return store->getRados()->raw_obj_stat(dpp, raw_obj, psize, pmtime, pepoch,
916 nullptr, nullptr, objv_tracker, null_yield);
917 }
918
919 RGWStatObjCR::RGWStatObjCR(const DoutPrefixProvider *dpp,
920 RGWAsyncRadosProcessor *async_rados, rgw::sal::RGWRadosStore *store,
921 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
922 real_time* pmtime, uint64_t *pepoch,
923 RGWObjVersionTracker *objv_tracker)
924 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), async_rados(async_rados),
925 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
926 objv_tracker(objv_tracker)
927 {
928 }
929
930 void RGWStatObjCR::request_cleanup()
931 {
932 if (req) {
933 req->finish();
934 req = NULL;
935 }
936 }
937
938 int RGWStatObjCR::send_request(const DoutPrefixProvider *dpp)
939 {
940 req = new RGWAsyncStatObj(dpp, this, stack->create_completion_notifier(),
941 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
942 async_rados->queue(req);
943 return 0;
944 }
945
946 int RGWStatObjCR::request_complete()
947 {
948 return req->get_ret_status();
949 }
950
951 RGWRadosNotifyCR::RGWRadosNotifyCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
952 bufferlist& request, uint64_t timeout_ms,
953 bufferlist *response)
954 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
955 request(request), timeout_ms(timeout_ms), response(response)
956 {
957 set_description() << "notify dest=" << obj;
958 }
959
960 int RGWRadosNotifyCR::send_request(const DoutPrefixProvider *dpp)
961 {
962 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
963 if (r < 0) {
964 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
965 return r;
966 }
967
968 set_status() << "sending request";
969
970 cn = stack->create_completion_notifier();
971 return ref.pool.ioctx().aio_notify(ref.obj.oid, cn->completion(), request,
972 timeout_ms, response);
973 }
974
975 int RGWRadosNotifyCR::request_complete()
976 {
977 int r = cn->completion()->get_return_value();
978
979 set_status() << "request complete; ret=" << r;
980
981 return r;
982 }