]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
1 #include "rgw_rados.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_cr_rados.h"
4
5 #include "cls/lock/cls_lock_client.h"
6 #include "cls/rgw/cls_rgw_client.h"
7
8 #include <boost/asio/yield.hpp>
9
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rgw
12
13 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
14 if (processor->is_going_down()) {
15 return false;
16 }
17 req->get();
18 processor->m_req_queue.push_back(req);
19 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
20 _dump_queue();
21 return true;
22 }
23
24 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
25 return processor->m_req_queue.empty();
26 }
27
28 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
29 if (processor->m_req_queue.empty())
30 return NULL;
31 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
32 processor->m_req_queue.pop_front();
33 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
34 _dump_queue();
35 return req;
36 }
37
38 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
39 processor->handle_request(req);
40 processor->req_throttle.put(1);
41 }
42
43 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
44 if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
45 return;
46 }
47 deque<RGWAsyncRadosRequest *>::iterator iter;
48 if (processor->m_req_queue.empty()) {
49 dout(20) << "RGWWQ: empty" << dendl;
50 return;
51 }
52 dout(20) << "RGWWQ:" << dendl;
53 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
54 dout(20) << "req: " << hex << *iter << dec << dendl;
55 }
56 }
57
58 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
59 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
60 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
61 req_wq(this, g_conf->rgw_op_thread_timeout,
62 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
63 }
64
65 void RGWAsyncRadosProcessor::start() {
66 m_tp.start();
67 }
68
69 void RGWAsyncRadosProcessor::stop() {
70 going_down = true;
71 m_tp.drain(&req_wq);
72 m_tp.stop();
73 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
74 (*iter)->put();
75 }
76 }
77
78 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
79 req->send_request();
80 req->put();
81 }
82
83 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
84 req_throttle.get(1);
85 req_wq.queue(req);
86 }
87
88 int RGWAsyncGetSystemObj::_send_request()
89 {
90 map<string, bufferlist> *pattrs = want_attrs ? &attrs : nullptr;
91
92 return store->get_system_obj(obj_ctx, read_state, &objv_tracker,
93 obj, bl, ofs, end, pattrs, nullptr);
94 }
95
96 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
97 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
98 off_t _ofs, off_t _end, bool want_attrs)
99 : RGWAsyncRadosRequest(caller, cn), store(_store), obj_ctx(_store),
100 obj(_obj), ofs(_ofs), end(_end), want_attrs(want_attrs)
101 {
102 if (_objv_tracker) {
103 objv_tracker = *_objv_tracker;
104 }
105 }
106
107 int RGWSimpleRadosReadAttrsCR::send_request()
108 {
109 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
110 store, nullptr, obj, 0, -1, true);
111 async_rados->queue(req);
112 return 0;
113 }
114
115 int RGWSimpleRadosReadAttrsCR::request_complete()
116 {
117 if (pattrs) {
118 *pattrs = std::move(req->attrs);
119 }
120 return req->get_ret_status();
121 }
122
123 int RGWAsyncPutSystemObj::_send_request()
124 {
125 return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, &objv_tracker);
126 }
127
128 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
129 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
130 bool _exclusive, bufferlist _bl)
131 : RGWAsyncRadosRequest(caller, cn), store(_store),
132 obj(_obj), exclusive(_exclusive), bl(std::move(_bl))
133 {
134 if (_objv_tracker) {
135 objv_tracker = *_objv_tracker;
136 }
137 }
138
139 int RGWAsyncPutSystemObjAttrs::_send_request()
140 {
141 return store->system_obj_set_attrs(nullptr, obj, attrs, nullptr, &objv_tracker);
142 }
143
144 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
145 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
146 map<string, bufferlist> _attrs)
147 : RGWAsyncRadosRequest(caller, cn), store(_store),
148 obj(_obj), attrs(std::move(_attrs))
149 {
150 }
151
152
153 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
154 uint64_t _window_size)
155 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
156 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
157 {
158 }
159
160 int RGWAsyncLockSystemObj::_send_request()
161 {
162 rgw_rados_ref ref;
163 int r = store->get_raw_obj_ref(obj, &ref);
164 if (r < 0) {
165 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
166 return r;
167 }
168
169 rados::cls::lock::Lock l(lock_name);
170 utime_t duration(duration_secs, 0);
171 l.set_duration(duration);
172 l.set_cookie(cookie);
173 l.set_renew(true);
174
175 return l.lock_exclusive(&ref.ioctx, ref.oid);
176 }
177
178 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
179 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
180 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
181 obj(_obj),
182 lock_name(_name),
183 cookie(_cookie),
184 duration_secs(_duration_secs)
185 {
186 }
187
188 int RGWAsyncUnlockSystemObj::_send_request()
189 {
190 rgw_rados_ref ref;
191 int r = store->get_raw_obj_ref(obj, &ref);
192 if (r < 0) {
193 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
194 return r;
195 }
196
197 rados::cls::lock::Lock l(lock_name);
198
199 l.set_cookie(cookie);
200
201 return l.unlock(&ref.ioctx, ref.oid);
202 }
203
204 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
205 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
206 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
207 obj(_obj),
208 lock_name(_name), cookie(_cookie)
209 {
210 }
211
212 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
213 const rgw_raw_obj& _obj,
214 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
215 store(_store),
216 entries(_entries),
217 obj(_obj), cn(NULL)
218 {
219 stringstream& s = set_description();
220 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
221 for (auto i = entries.begin(); i != entries.end(); ++i) {
222 if (i != entries.begin()) {
223 s << ", ";
224 }
225 s << i->first;
226 }
227 s << "]";
228 }
229
230 int RGWRadosSetOmapKeysCR::send_request()
231 {
232 int r = store->get_raw_obj_ref(obj, &ref);
233 if (r < 0) {
234 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
235 return r;
236 }
237
238 set_status() << "sending request";
239
240 librados::ObjectWriteOperation op;
241 op.omap_set(entries);
242
243 cn = stack->create_completion_notifier();
244 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
245 }
246
247 int RGWRadosSetOmapKeysCR::request_complete()
248 {
249 int r = cn->completion()->get_return_value();
250
251 set_status() << "request complete; ret=" << r;
252
253 return r;
254 }
255
256 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
257 const rgw_raw_obj& _obj,
258 const string& _marker,
259 std::set<std::string> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
260 store(_store),
261 marker(_marker),
262 entries(_entries), max_entries(_max_entries),
263 obj(_obj), cn(NULL)
264 {
265 set_description() << "set omap keys dest=" << obj << " marker=" << marker;
266 }
267
268 int RGWRadosGetOmapKeysCR::send_request() {
269 int r = store->get_raw_obj_ref(obj, &ref);
270 if (r < 0) {
271 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
272 return r;
273 }
274
275 set_status() << "send request";
276
277 librados::ObjectReadOperation op;
278 op.omap_get_keys2(marker, max_entries, entries, nullptr, nullptr);
279
280 cn = stack->create_completion_notifier();
281 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
282 }
283
284 int RGWRadosGetOmapKeysCR::request_complete()
285 {
286 int r = cn->completion()->get_return_value();
287
288 set_status() << "request complete; ret=" << r;
289
290 return r;
291 }
292
293 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
294 const rgw_raw_obj& _obj,
295 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
296 store(_store),
297 keys(_keys),
298 obj(_obj), cn(NULL)
299 {
300 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
301 }
302
303 int RGWRadosRemoveOmapKeysCR::send_request() {
304 int r = store->get_raw_obj_ref(obj, &ref);
305 if (r < 0) {
306 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
307 return r;
308 }
309
310 set_status() << "send request";
311
312 librados::ObjectWriteOperation op;
313 op.omap_rm_keys(keys);
314
315 cn = stack->create_completion_notifier();
316 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
317 }
318
319 int RGWRadosRemoveOmapKeysCR::request_complete()
320 {
321 int r = cn->completion()->get_return_value();
322
323 set_status() << "request complete; ret=" << r;
324
325 return r;
326 }
327
328 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
329 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
330 {
331 set_description() << "remove dest=" << obj;
332 }
333
334 int RGWRadosRemoveCR::send_request()
335 {
336 auto rados = store->get_rados_handle();
337 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
338 if (r < 0) {
339 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
340 return r;
341 }
342 ioctx.locator_set_key(obj.loc);
343
344 set_status() << "send request";
345
346 librados::ObjectWriteOperation op;
347 op.remove();
348
349 cn = stack->create_completion_notifier();
350 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
351 }
352
353 int RGWRadosRemoveCR::request_complete()
354 {
355 int r = cn->completion()->get_return_value();
356
357 set_status() << "request complete; ret=" << r;
358
359 return r;
360 }
361
362 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
363 const rgw_raw_obj& _obj,
364 const string& _lock_name,
365 const string& _cookie,
366 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
367 async_rados(_async_rados),
368 store(_store),
369 lock_name(_lock_name),
370 cookie(_cookie),
371 duration(_duration),
372 obj(_obj),
373 req(NULL)
374 {
375 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
376 }
377
378 void RGWSimpleRadosLockCR::request_cleanup()
379 {
380 if (req) {
381 req->finish();
382 req = NULL;
383 }
384 }
385
386 int RGWSimpleRadosLockCR::send_request()
387 {
388 set_status() << "sending request";
389 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
390 store, NULL, obj, lock_name, cookie, duration);
391 async_rados->queue(req);
392 return 0;
393 }
394
395 int RGWSimpleRadosLockCR::request_complete()
396 {
397 set_status() << "request complete; ret=" << req->get_ret_status();
398 return req->get_ret_status();
399 }
400
401 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
402 const rgw_raw_obj& _obj,
403 const string& _lock_name,
404 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
405 async_rados(_async_rados),
406 store(_store),
407 lock_name(_lock_name),
408 cookie(_cookie),
409 obj(_obj),
410 req(NULL)
411 {
412 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
413 }
414
415 void RGWSimpleRadosUnlockCR::request_cleanup()
416 {
417 if (req) {
418 req->finish();
419 req = NULL;
420 }
421 }
422
423 int RGWSimpleRadosUnlockCR::send_request()
424 {
425 set_status() << "sending request";
426
427 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
428 store, NULL, obj, lock_name, cookie);
429 async_rados->queue(req);
430 return 0;
431 }
432
433 int RGWSimpleRadosUnlockCR::request_complete()
434 {
435 set_status() << "request complete; ret=" << req->get_ret_status();
436 return req->get_ret_status();
437 }
438
439 int RGWOmapAppend::operate() {
440 reenter(this) {
441 for (;;) {
442 if (!has_product() && going_down) {
443 set_status() << "going down";
444 break;
445 }
446 set_status() << "waiting for product";
447 yield wait_for_product();
448 yield {
449 string entry;
450 while (consume(&entry)) {
451 set_status() << "adding entry: " << entry;
452 entries[entry] = bufferlist();
453 if (entries.size() >= window_size) {
454 break;
455 }
456 }
457 if (entries.size() >= window_size || going_down) {
458 set_status() << "flushing to omap";
459 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
460 entries.clear();
461 }
462 }
463 if (get_ret_status() < 0) {
464 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
465 return set_state(RGWCoroutine_Error);
466 }
467 }
468 /* done with coroutine */
469 return set_state(RGWCoroutine_Done);
470 }
471 return 0;
472 }
473
474 void RGWOmapAppend::flush_pending() {
475 receive(pending_entries);
476 num_pending_entries = 0;
477 }
478
479 bool RGWOmapAppend::append(const string& s) {
480 if (is_done()) {
481 return false;
482 }
483 ++total_entries;
484 pending_entries.push_back(s);
485 if (++num_pending_entries >= (int)window_size) {
486 flush_pending();
487 }
488 return true;
489 }
490
491 bool RGWOmapAppend::finish() {
492 going_down = true;
493 flush_pending();
494 set_sleeping(false);
495 return (!is_done());
496 }
497
498 int RGWAsyncGetBucketInstanceInfo::_send_request()
499 {
500 RGWObjectCtx obj_ctx(store);
501 int r = store->get_bucket_instance_from_oid(obj_ctx, oid, bucket_info, NULL, NULL);
502 if (r < 0) {
503 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
504 << oid << dendl;
505 return r;
506 }
507
508 return 0;
509 }
510
511 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(RGWRados *store,
512 const RGWBucketInfo& bucket_info,
513 int shard_id,
514 const std::string& start_marker,
515 const std::string& end_marker)
516 : RGWSimpleCoroutine(store->ctx()), bs(store),
517 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
518 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
519 {
520 bs.init(bucket_info, shard_id);
521 }
522
523 int RGWRadosBILogTrimCR::send_request()
524 {
525 bufferlist in;
526 cls_rgw_bi_log_trim_op call;
527 call.start_marker = std::move(start_marker);
528 call.end_marker = std::move(end_marker);
529 ::encode(call, in);
530
531 librados::ObjectWriteOperation op;
532 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
533
534 cn = stack->create_completion_notifier();
535 return bs.index_ctx.aio_operate(bs.bucket_obj, cn->completion(), &op);
536 }
537
538 int RGWRadosBILogTrimCR::request_complete()
539 {
540 int r = cn->completion()->get_return_value();
541 set_status() << "request complete; ret=" << r;
542 return r;
543 }
544
545 int RGWAsyncFetchRemoteObj::_send_request()
546 {
547 RGWObjectCtx obj_ctx(store);
548
549 string user_id;
550 char buf[16];
551 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
552 string client_id = store->zone_id() + buf;
553 string op_id = store->unique_id(store->get_new_req_id());
554 map<string, bufferlist> attrs;
555
556 rgw_obj src_obj(bucket_info.bucket, key);
557
558 rgw_obj dest_obj(src_obj);
559
560 int r = store->fetch_remote_obj(obj_ctx,
561 user_id,
562 client_id,
563 op_id,
564 false, /* don't record op state in ops log */
565 NULL, /* req_info */
566 source_zone,
567 dest_obj,
568 src_obj,
569 bucket_info, /* dest */
570 bucket_info, /* source */
571 NULL, /* real_time* src_mtime, */
572 NULL, /* real_time* mtime, */
573 NULL, /* const real_time* mod_ptr, */
574 NULL, /* const real_time* unmod_ptr, */
575 false, /* high precision time */
576 NULL, /* const char *if_match, */
577 NULL, /* const char *if_nomatch, */
578 RGWRados::ATTRSMOD_NONE,
579 copy_if_newer,
580 attrs,
581 RGW_OBJ_CATEGORY_MAIN,
582 versioned_epoch,
583 real_time(), /* delete_at */
584 &key.instance, /* string *version_id, */
585 NULL, /* string *ptag, */
586 NULL, /* string *petag, */
587 NULL, /* void (*progress_cb)(off_t, void *), */
588 NULL, /* void *progress_data*); */
589 &zones_trace);
590
591 if (r < 0) {
592 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
593 }
594 return r;
595 }
596
597 int RGWAsyncStatRemoteObj::_send_request()
598 {
599 RGWObjectCtx obj_ctx(store);
600
601 string user_id;
602 char buf[16];
603 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
604 string client_id = store->zone_id() + buf;
605 string op_id = store->unique_id(store->get_new_req_id());
606
607 rgw_obj src_obj(bucket_info.bucket, key);
608
609 rgw_obj dest_obj(src_obj);
610
611 int r = store->stat_remote_obj(obj_ctx,
612 user_id,
613 client_id,
614 nullptr, /* req_info */
615 source_zone,
616 src_obj,
617 bucket_info, /* source */
618 pmtime, /* real_time* src_mtime, */
619 psize, /* uint64_t * */
620 nullptr, /* const real_time* mod_ptr, */
621 nullptr, /* const real_time* unmod_ptr, */
622 true, /* high precision time */
623 nullptr, /* const char *if_match, */
624 nullptr, /* const char *if_nomatch, */
625 pattrs,
626 nullptr,
627 nullptr, /* string *ptag, */
628 nullptr); /* string *petag, */
629
630 if (r < 0) {
631 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
632 }
633 return r;
634 }
635
636
637 int RGWAsyncRemoveObj::_send_request()
638 {
639 RGWObjectCtx obj_ctx(store);
640
641 rgw_obj obj(bucket_info.bucket, key);
642
643 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
644
645 obj_ctx.obj.set_atomic(obj);
646
647 RGWObjState *state;
648
649 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
650 if (ret < 0) {
651 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
652 return ret;
653 }
654
655 /* has there been any racing object write? */
656 if (del_if_older && (state->mtime > timestamp)) {
657 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
658 return 0;
659 }
660
661 RGWAccessControlPolicy policy;
662
663 /* decode policy */
664 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
665 if (iter != state->attrset.end()) {
666 bufferlist::iterator bliter = iter->second.begin();
667 try {
668 policy.decode(bliter);
669 } catch (buffer::error& err) {
670 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
671 return -EIO;
672 }
673 }
674
675 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
676 RGWRados::Object::Delete del_op(&del_target);
677
678 del_op.params.bucket_owner = bucket_info.owner;
679 del_op.params.obj_owner = policy.get_owner();
680 if (del_if_older) {
681 del_op.params.unmod_since = timestamp;
682 }
683 if (versioned) {
684 del_op.params.versioning_status = BUCKET_VERSIONED;
685 }
686 del_op.params.olh_epoch = versioned_epoch;
687 del_op.params.marker_version_id = marker_version_id;
688 del_op.params.obj_owner.set_id(owner);
689 del_op.params.obj_owner.set_name(owner_display_name);
690 del_op.params.mtime = timestamp;
691 del_op.params.high_precision_time = true;
692 del_op.params.zones_trace = &zones_trace;
693
694 ret = del_op.delete_obj();
695 if (ret < 0) {
696 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
697 }
698 return ret;
699 }
700
701 int RGWContinuousLeaseCR::operate()
702 {
703 if (aborted) {
704 caller->set_sleeping(false);
705 return set_cr_done();
706 }
707 reenter(this) {
708 while (!going_down) {
709 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
710
711 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
712 if (retcode < 0) {
713 set_locked(false);
714 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
715 return set_state(RGWCoroutine_Error, retcode);
716 }
717 set_locked(true);
718 yield wait(utime_t(interval / 2, 0));
719 }
720 set_locked(false); /* moot at this point anyway */
721 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
722 return set_state(RGWCoroutine_Done);
723 }
724 return 0;
725 }
726
727 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
728 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
729 store(_store),
730 oid(_oid), cn(NULL)
731 {
732 stringstream& s = set_description();
733 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
734 entries.push_back(entry);
735 }
736
737 int RGWRadosTimelogAddCR::send_request()
738 {
739 set_status() << "sending request";
740
741 cn = stack->create_completion_notifier();
742 return store->time_log_add(oid, entries, cn->completion(), true);
743 }
744
745 int RGWRadosTimelogAddCR::request_complete()
746 {
747 int r = cn->completion()->get_return_value();
748
749 set_status() << "request complete; ret=" << r;
750
751 return r;
752 }
753
754 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
755 const std::string& oid,
756 const real_time& start_time,
757 const real_time& end_time,
758 const std::string& from_marker,
759 const std::string& to_marker)
760 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
761 start_time(start_time), end_time(end_time),
762 from_marker(from_marker), to_marker(to_marker)
763 {
764 set_description() << "timelog trim oid=" << oid
765 << " start_time=" << start_time << " end_time=" << end_time
766 << " from_marker=" << from_marker << " to_marker=" << to_marker;
767 }
768
769 int RGWRadosTimelogTrimCR::send_request()
770 {
771 set_status() << "sending request";
772
773 cn = stack->create_completion_notifier();
774 return store->time_log_trim(oid, start_time, end_time, from_marker,
775 to_marker, cn->completion());
776 }
777
778 int RGWRadosTimelogTrimCR::request_complete()
779 {
780 int r = cn->completion()->get_return_value();
781
782 set_status() << "request complete; ret=" << r;
783
784 return r;
785 }
786
787
788 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
789 const std::string& to_marker,
790 std::string *last_trim_marker)
791 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
792 std::string{}, to_marker),
793 cct(store->ctx()), last_trim_marker(last_trim_marker)
794 {
795 }
796
797 int RGWSyncLogTrimCR::request_complete()
798 {
799 int r = RGWRadosTimelogTrimCR::request_complete();
800 if (r < 0 && r != -ENODATA) {
801 return r;
802 }
803 if (*last_trim_marker < to_marker) {
804 *last_trim_marker = to_marker;
805 }
806 return 0;
807 }
808
809
810 int RGWAsyncStatObj::_send_request()
811 {
812 rgw_raw_obj raw_obj;
813 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
814 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
815 nullptr, nullptr, objv_tracker);
816 }
817
818 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
819 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
820 real_time* pmtime, uint64_t *pepoch,
821 RGWObjVersionTracker *objv_tracker)
822 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
823 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
824 objv_tracker(objv_tracker)
825 {
826 }
827
828 void RGWStatObjCR::request_cleanup()
829 {
830 if (req) {
831 req->finish();
832 req = NULL;
833 }
834 }
835
836 int RGWStatObjCR::send_request()
837 {
838 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
839 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
840 async_rados->queue(req);
841 return 0;
842 }
843
844 int RGWStatObjCR::request_complete()
845 {
846 return req->get_ret_status();
847 }
848
849 RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
850 bufferlist& request, uint64_t timeout_ms,
851 bufferlist *response)
852 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
853 request(request), timeout_ms(timeout_ms), response(response)
854 {
855 set_description() << "notify dest=" << obj;
856 }
857
858 int RGWRadosNotifyCR::send_request()
859 {
860 int r = store->get_raw_obj_ref(obj, &ref);
861 if (r < 0) {
862 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
863 return r;
864 }
865
866 set_status() << "sending request";
867
868 cn = stack->create_completion_notifier();
869 return ref.ioctx.aio_notify(ref.oid, cn->completion(), request,
870 timeout_ms, response);
871 }
872
873 int RGWRadosNotifyCR::request_complete()
874 {
875 int r = cn->completion()->get_return_value();
876
877 set_status() << "request complete; ret=" << r;
878
879 return r;
880 }