]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
1 #include "rgw_rados.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_cr_rados.h"
4
5 #include "cls/lock/cls_lock_client.h"
6 #include "cls/rgw/cls_rgw_client.h"
7
8 #include <boost/asio/yield.hpp>
9
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rgw
12
13 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
14 if (processor->is_going_down()) {
15 return false;
16 }
17 req->get();
18 processor->m_req_queue.push_back(req);
19 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
20 _dump_queue();
21 return true;
22 }
23
24 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
25 return processor->m_req_queue.empty();
26 }
27
28 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
29 if (processor->m_req_queue.empty())
30 return NULL;
31 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
32 processor->m_req_queue.pop_front();
33 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
34 _dump_queue();
35 return req;
36 }
37
38 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
39 processor->handle_request(req);
40 processor->req_throttle.put(1);
41 }
42
43 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
44 if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
45 return;
46 }
47 deque<RGWAsyncRadosRequest *>::iterator iter;
48 if (processor->m_req_queue.empty()) {
49 dout(20) << "RGWWQ: empty" << dendl;
50 return;
51 }
52 dout(20) << "RGWWQ:" << dendl;
53 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
54 dout(20) << "req: " << hex << *iter << dec << dendl;
55 }
56 }
57
58 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
59 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
60 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
61 req_wq(this, g_conf->rgw_op_thread_timeout,
62 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
63 }
64
65 void RGWAsyncRadosProcessor::start() {
66 m_tp.start();
67 }
68
69 void RGWAsyncRadosProcessor::stop() {
70 going_down = true;
71 m_tp.drain(&req_wq);
72 m_tp.stop();
73 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
74 (*iter)->put();
75 }
76 }
77
78 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
79 req->send_request();
80 req->put();
81 }
82
83 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
84 req_throttle.get(1);
85 req_wq.queue(req);
86 }
87
88 int RGWAsyncGetSystemObj::_send_request()
89 {
90 return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, pattrs, NULL);
91 }
92
93 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
94 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
95 bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(caller, cn), store(_store), obj_ctx(_obj_ctx),
96 objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), pattrs(NULL),
97 ofs(_ofs), end(_end)
98 {
99 }
100
101 int RGWSimpleRadosReadAttrsCR::send_request()
102 {
103 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
104 store, &obj_ctx, NULL,
105 obj,
106 &bl, 0, -1);
107 if (pattrs) {
108 req->set_read_attrs(pattrs);
109 }
110 async_rados->queue(req);
111 return 0;
112 }
113
114 int RGWSimpleRadosReadAttrsCR::request_complete()
115 {
116 return req->get_ret_status();
117 }
118
119 int RGWAsyncPutSystemObj::_send_request()
120 {
121 return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
122 }
123
124 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
125 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
126 bool _exclusive, bufferlist& _bl)
127 : RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
128 obj(_obj), exclusive(_exclusive), bl(_bl)
129 {
130 }
131
132 int RGWAsyncPutSystemObjAttrs::_send_request()
133 {
134 return store->system_obj_set_attrs(NULL, obj, *attrs, NULL, objv_tracker);
135 }
136
137 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
138 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
139 map<string, bufferlist> *_attrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
140 objv_tracker(_objv_tracker), obj(_obj),
141 attrs(_attrs)
142 {
143 }
144
145
146 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
147 uint64_t _window_size)
148 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
149 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
150 {
151 }
152
153 int RGWAsyncLockSystemObj::_send_request()
154 {
155 rgw_rados_ref ref;
156 int r = store->get_raw_obj_ref(obj, &ref);
157 if (r < 0) {
158 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
159 return r;
160 }
161
162 rados::cls::lock::Lock l(lock_name);
163 utime_t duration(duration_secs, 0);
164 l.set_duration(duration);
165 l.set_cookie(cookie);
166 l.set_renew(true);
167
168 return l.lock_exclusive(&ref.ioctx, ref.oid);
169 }
170
171 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
172 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
173 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
174 obj(_obj),
175 lock_name(_name),
176 cookie(_cookie),
177 duration_secs(_duration_secs)
178 {
179 }
180
181 int RGWAsyncUnlockSystemObj::_send_request()
182 {
183 rgw_rados_ref ref;
184 int r = store->get_raw_obj_ref(obj, &ref);
185 if (r < 0) {
186 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
187 return r;
188 }
189
190 rados::cls::lock::Lock l(lock_name);
191
192 l.set_cookie(cookie);
193
194 return l.unlock(&ref.ioctx, ref.oid);
195 }
196
197 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
198 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
199 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
200 obj(_obj),
201 lock_name(_name), cookie(_cookie)
202 {
203 }
204
205 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
206 const rgw_raw_obj& _obj,
207 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
208 store(_store),
209 entries(_entries),
210 obj(_obj), cn(NULL)
211 {
212 stringstream& s = set_description();
213 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
214 for (auto i = entries.begin(); i != entries.end(); ++i) {
215 if (i != entries.begin()) {
216 s << ", ";
217 }
218 s << i->first;
219 }
220 s << "]";
221 }
222
223 int RGWRadosSetOmapKeysCR::send_request()
224 {
225 int r = store->get_raw_obj_ref(obj, &ref);
226 if (r < 0) {
227 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
228 return r;
229 }
230
231 set_status() << "sending request";
232
233 librados::ObjectWriteOperation op;
234 op.omap_set(entries);
235
236 cn = stack->create_completion_notifier();
237 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
238 }
239
240 int RGWRadosSetOmapKeysCR::request_complete()
241 {
242 int r = cn->completion()->get_return_value();
243
244 set_status() << "request complete; ret=" << r;
245
246 return r;
247 }
248
249 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
250 const rgw_raw_obj& _obj,
251 const string& _marker,
252 std::set<std::string> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
253 store(_store),
254 marker(_marker),
255 entries(_entries), max_entries(_max_entries),
256 obj(_obj), cn(NULL)
257 {
258 set_description() << "set omap keys dest=" << obj << " marker=" << marker;
259 }
260
261 int RGWRadosGetOmapKeysCR::send_request() {
262 int r = store->get_raw_obj_ref(obj, &ref);
263 if (r < 0) {
264 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
265 return r;
266 }
267
268 set_status() << "send request";
269
270 librados::ObjectReadOperation op;
271 op.omap_get_keys2(marker, max_entries, entries, nullptr, nullptr);
272
273 cn = stack->create_completion_notifier();
274 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
275 }
276
277 int RGWRadosGetOmapKeysCR::request_complete()
278 {
279 int r = cn->completion()->get_return_value();
280
281 set_status() << "request complete; ret=" << r;
282
283 return r;
284 }
285
286 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
287 const rgw_raw_obj& _obj,
288 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
289 store(_store),
290 keys(_keys),
291 obj(_obj), cn(NULL)
292 {
293 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
294 }
295
296 int RGWRadosRemoveOmapKeysCR::send_request() {
297 int r = store->get_raw_obj_ref(obj, &ref);
298 if (r < 0) {
299 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
300 return r;
301 }
302
303 set_status() << "send request";
304
305 librados::ObjectWriteOperation op;
306 op.omap_rm_keys(keys);
307
308 cn = stack->create_completion_notifier();
309 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
310 }
311
312 int RGWRadosRemoveOmapKeysCR::request_complete()
313 {
314 int r = cn->completion()->get_return_value();
315
316 set_status() << "request complete; ret=" << r;
317
318 return r;
319 }
320
321 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
322 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
323 {
324 set_description() << "remove dest=" << obj;
325 }
326
327 int RGWRadosRemoveCR::send_request()
328 {
329 auto rados = store->get_rados_handle();
330 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
331 if (r < 0) {
332 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
333 return r;
334 }
335 ioctx.locator_set_key(obj.loc);
336
337 set_status() << "send request";
338
339 librados::ObjectWriteOperation op;
340 op.remove();
341
342 cn = stack->create_completion_notifier();
343 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
344 }
345
346 int RGWRadosRemoveCR::request_complete()
347 {
348 int r = cn->completion()->get_return_value();
349
350 set_status() << "request complete; ret=" << r;
351
352 return r;
353 }
354
355 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
356 const rgw_raw_obj& _obj,
357 const string& _lock_name,
358 const string& _cookie,
359 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
360 async_rados(_async_rados),
361 store(_store),
362 lock_name(_lock_name),
363 cookie(_cookie),
364 duration(_duration),
365 obj(_obj),
366 req(NULL)
367 {
368 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
369 }
370
371 void RGWSimpleRadosLockCR::request_cleanup()
372 {
373 if (req) {
374 req->finish();
375 req = NULL;
376 }
377 }
378
379 int RGWSimpleRadosLockCR::send_request()
380 {
381 set_status() << "sending request";
382 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
383 store, NULL, obj, lock_name, cookie, duration);
384 async_rados->queue(req);
385 return 0;
386 }
387
388 int RGWSimpleRadosLockCR::request_complete()
389 {
390 set_status() << "request complete; ret=" << req->get_ret_status();
391 return req->get_ret_status();
392 }
393
394 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
395 const rgw_raw_obj& _obj,
396 const string& _lock_name,
397 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
398 async_rados(_async_rados),
399 store(_store),
400 lock_name(_lock_name),
401 cookie(_cookie),
402 obj(_obj),
403 req(NULL)
404 {
405 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
406 }
407
408 void RGWSimpleRadosUnlockCR::request_cleanup()
409 {
410 if (req) {
411 req->finish();
412 req = NULL;
413 }
414 }
415
416 int RGWSimpleRadosUnlockCR::send_request()
417 {
418 set_status() << "sending request";
419
420 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
421 store, NULL, obj, lock_name, cookie);
422 async_rados->queue(req);
423 return 0;
424 }
425
426 int RGWSimpleRadosUnlockCR::request_complete()
427 {
428 set_status() << "request complete; ret=" << req->get_ret_status();
429 return req->get_ret_status();
430 }
431
432 int RGWOmapAppend::operate() {
433 reenter(this) {
434 for (;;) {
435 if (!has_product() && going_down) {
436 set_status() << "going down";
437 break;
438 }
439 set_status() << "waiting for product";
440 yield wait_for_product();
441 yield {
442 string entry;
443 while (consume(&entry)) {
444 set_status() << "adding entry: " << entry;
445 entries[entry] = bufferlist();
446 if (entries.size() >= window_size) {
447 break;
448 }
449 }
450 if (entries.size() >= window_size || going_down) {
451 set_status() << "flushing to omap";
452 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
453 entries.clear();
454 }
455 }
456 if (get_ret_status() < 0) {
457 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
458 return set_state(RGWCoroutine_Error);
459 }
460 }
461 /* done with coroutine */
462 return set_state(RGWCoroutine_Done);
463 }
464 return 0;
465 }
466
467 void RGWOmapAppend::flush_pending() {
468 receive(pending_entries);
469 num_pending_entries = 0;
470 }
471
472 bool RGWOmapAppend::append(const string& s) {
473 if (is_done()) {
474 return false;
475 }
476 ++total_entries;
477 pending_entries.push_back(s);
478 if (++num_pending_entries >= (int)window_size) {
479 flush_pending();
480 }
481 return true;
482 }
483
484 bool RGWOmapAppend::finish() {
485 going_down = true;
486 flush_pending();
487 set_sleeping(false);
488 return (!is_done());
489 }
490
491 int RGWAsyncGetBucketInstanceInfo::_send_request()
492 {
493 RGWObjectCtx obj_ctx(store);
494 int r = store->get_bucket_instance_from_oid(obj_ctx, oid, *bucket_info, NULL, NULL);
495 if (r < 0) {
496 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
497 << oid << dendl;
498 return r;
499 }
500
501 return 0;
502 }
503
504 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(RGWRados *store,
505 const RGWBucketInfo& bucket_info,
506 int shard_id,
507 const std::string& start_marker,
508 const std::string& end_marker)
509 : RGWSimpleCoroutine(store->ctx()), bs(store),
510 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
511 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
512 {
513 bs.init(bucket_info, shard_id);
514 }
515
516 int RGWRadosBILogTrimCR::send_request()
517 {
518 bufferlist in;
519 cls_rgw_bi_log_trim_op call;
520 call.start_marker = std::move(start_marker);
521 call.end_marker = std::move(end_marker);
522 ::encode(call, in);
523
524 librados::ObjectWriteOperation op;
525 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
526
527 cn = stack->create_completion_notifier();
528 return bs.index_ctx.aio_operate(bs.bucket_obj, cn->completion(), &op);
529 }
530
531 int RGWRadosBILogTrimCR::request_complete()
532 {
533 int r = cn->completion()->get_return_value();
534 set_status() << "request complete; ret=" << r;
535 return r;
536 }
537
538 int RGWAsyncFetchRemoteObj::_send_request()
539 {
540 RGWObjectCtx obj_ctx(store);
541
542 string user_id;
543 char buf[16];
544 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
545 string client_id = store->zone_id() + buf;
546 string op_id = store->unique_id(store->get_new_req_id());
547 map<string, bufferlist> attrs;
548
549 rgw_obj src_obj(bucket_info.bucket, key);
550
551 rgw_obj dest_obj(src_obj);
552
553 int r = store->fetch_remote_obj(obj_ctx,
554 user_id,
555 client_id,
556 op_id,
557 false, /* don't record op state in ops log */
558 NULL, /* req_info */
559 source_zone,
560 dest_obj,
561 src_obj,
562 bucket_info, /* dest */
563 bucket_info, /* source */
564 NULL, /* real_time* src_mtime, */
565 NULL, /* real_time* mtime, */
566 NULL, /* const real_time* mod_ptr, */
567 NULL, /* const real_time* unmod_ptr, */
568 false, /* high precision time */
569 NULL, /* const char *if_match, */
570 NULL, /* const char *if_nomatch, */
571 RGWRados::ATTRSMOD_NONE,
572 copy_if_newer,
573 attrs,
574 RGW_OBJ_CATEGORY_MAIN,
575 versioned_epoch,
576 real_time(), /* delete_at */
577 &key.instance, /* string *version_id, */
578 NULL, /* string *ptag, */
579 NULL, /* string *petag, */
580 NULL, /* void (*progress_cb)(off_t, void *), */
581 NULL, /* void *progress_data*); */
582 zones_trace);
583
584 if (r < 0) {
585 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
586 }
587 return r;
588 }
589
590 int RGWAsyncStatRemoteObj::_send_request()
591 {
592 RGWObjectCtx obj_ctx(store);
593
594 string user_id;
595 char buf[16];
596 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
597 string client_id = store->zone_id() + buf;
598 string op_id = store->unique_id(store->get_new_req_id());
599
600 rgw_obj src_obj(bucket_info.bucket, key);
601
602 rgw_obj dest_obj(src_obj);
603
604 int r = store->stat_remote_obj(obj_ctx,
605 user_id,
606 client_id,
607 nullptr, /* req_info */
608 source_zone,
609 src_obj,
610 bucket_info, /* source */
611 pmtime, /* real_time* src_mtime, */
612 psize, /* uint64_t * */
613 nullptr, /* const real_time* mod_ptr, */
614 nullptr, /* const real_time* unmod_ptr, */
615 true, /* high precision time */
616 nullptr, /* const char *if_match, */
617 nullptr, /* const char *if_nomatch, */
618 pattrs,
619 nullptr,
620 nullptr, /* string *ptag, */
621 nullptr); /* string *petag, */
622
623 if (r < 0) {
624 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
625 }
626 return r;
627 }
628
629
630 int RGWAsyncRemoveObj::_send_request()
631 {
632 RGWObjectCtx obj_ctx(store);
633
634 rgw_obj obj(bucket_info.bucket, key);
635
636 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
637
638 obj_ctx.obj.set_atomic(obj);
639
640 RGWObjState *state;
641
642 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
643 if (ret < 0) {
644 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
645 return ret;
646 }
647
648 /* has there been any racing object write? */
649 if (del_if_older && (state->mtime > timestamp)) {
650 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
651 return 0;
652 }
653
654 RGWAccessControlPolicy policy;
655
656 /* decode policy */
657 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
658 if (iter != state->attrset.end()) {
659 bufferlist::iterator bliter = iter->second.begin();
660 try {
661 policy.decode(bliter);
662 } catch (buffer::error& err) {
663 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
664 return -EIO;
665 }
666 }
667
668 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
669 RGWRados::Object::Delete del_op(&del_target);
670
671 del_op.params.bucket_owner = bucket_info.owner;
672 del_op.params.obj_owner = policy.get_owner();
673 if (del_if_older) {
674 del_op.params.unmod_since = timestamp;
675 }
676 if (versioned) {
677 del_op.params.versioning_status = BUCKET_VERSIONED;
678 }
679 del_op.params.olh_epoch = versioned_epoch;
680 del_op.params.marker_version_id = marker_version_id;
681 del_op.params.obj_owner.set_id(owner);
682 del_op.params.obj_owner.set_name(owner_display_name);
683 del_op.params.mtime = timestamp;
684 del_op.params.high_precision_time = true;
685 del_op.params.zones_trace = zones_trace;
686
687 ret = del_op.delete_obj();
688 if (ret < 0) {
689 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
690 }
691 return ret;
692 }
693
694 int RGWContinuousLeaseCR::operate()
695 {
696 if (aborted) {
697 caller->set_sleeping(false);
698 return set_cr_done();
699 }
700 reenter(this) {
701 while (!going_down) {
702 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
703
704 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
705 if (retcode < 0) {
706 set_locked(false);
707 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
708 return set_state(RGWCoroutine_Error, retcode);
709 }
710 set_locked(true);
711 yield wait(utime_t(interval / 2, 0));
712 }
713 set_locked(false); /* moot at this point anyway */
714 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
715 return set_state(RGWCoroutine_Done);
716 }
717 return 0;
718 }
719
720 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
721 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
722 store(_store),
723 oid(_oid), cn(NULL)
724 {
725 stringstream& s = set_description();
726 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
727 entries.push_back(entry);
728 }
729
730 int RGWRadosTimelogAddCR::send_request()
731 {
732 set_status() << "sending request";
733
734 cn = stack->create_completion_notifier();
735 return store->time_log_add(oid, entries, cn->completion(), true);
736 }
737
738 int RGWRadosTimelogAddCR::request_complete()
739 {
740 int r = cn->completion()->get_return_value();
741
742 set_status() << "request complete; ret=" << r;
743
744 return r;
745 }
746
747 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
748 const std::string& oid,
749 const real_time& start_time,
750 const real_time& end_time,
751 const std::string& from_marker,
752 const std::string& to_marker)
753 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
754 start_time(start_time), end_time(end_time),
755 from_marker(from_marker), to_marker(to_marker)
756 {
757 set_description() << "timelog trim oid=" << oid
758 << " start_time=" << start_time << " end_time=" << end_time
759 << " from_marker=" << from_marker << " to_marker=" << to_marker;
760 }
761
762 int RGWRadosTimelogTrimCR::send_request()
763 {
764 set_status() << "sending request";
765
766 cn = stack->create_completion_notifier();
767 return store->time_log_trim(oid, start_time, end_time, from_marker,
768 to_marker, cn->completion());
769 }
770
771 int RGWRadosTimelogTrimCR::request_complete()
772 {
773 int r = cn->completion()->get_return_value();
774
775 set_status() << "request complete; ret=" << r;
776
777 return r;
778 }
779
780
781 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
782 const std::string& to_marker,
783 std::string *last_trim_marker)
784 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
785 std::string{}, to_marker),
786 cct(store->ctx()), last_trim_marker(last_trim_marker)
787 {
788 }
789
790 int RGWSyncLogTrimCR::request_complete()
791 {
792 int r = RGWRadosTimelogTrimCR::request_complete();
793 if (r < 0 && r != -ENODATA) {
794 return r;
795 }
796 if (*last_trim_marker < to_marker) {
797 *last_trim_marker = to_marker;
798 }
799 return 0;
800 }
801
802
803 int RGWAsyncStatObj::_send_request()
804 {
805 rgw_raw_obj raw_obj;
806 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
807 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
808 nullptr, nullptr, objv_tracker);
809 }
810
811 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
812 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
813 real_time* pmtime, uint64_t *pepoch,
814 RGWObjVersionTracker *objv_tracker)
815 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
816 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
817 objv_tracker(objv_tracker)
818 {
819 }
820
821 void RGWStatObjCR::request_cleanup()
822 {
823 if (req) {
824 req->finish();
825 req = NULL;
826 }
827 }
828
829 int RGWStatObjCR::send_request()
830 {
831 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
832 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
833 async_rados->queue(req);
834 return 0;
835 }
836
837 int RGWStatObjCR::request_complete()
838 {
839 return req->get_ret_status();
840 }
841
842 RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
843 bufferlist& request, uint64_t timeout_ms,
844 bufferlist *response)
845 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
846 request(request), timeout_ms(timeout_ms), response(response)
847 {
848 set_description() << "notify dest=" << obj;
849 }
850
851 int RGWRadosNotifyCR::send_request()
852 {
853 int r = store->get_raw_obj_ref(obj, &ref);
854 if (r < 0) {
855 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
856 return r;
857 }
858
859 set_status() << "sending request";
860
861 cn = stack->create_completion_notifier();
862 return ref.ioctx.aio_notify(ref.oid, cn->completion(), request,
863 timeout_ms, response);
864 }
865
866 int RGWRadosNotifyCR::request_complete()
867 {
868 int r = cn->completion()->get_return_value();
869
870 set_status() << "request complete; ret=" << r;
871
872 return r;
873 }