]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.cc
7284c10dc4e4fe49d8bd17e27756618580fc55e2
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "rgw_rados.h"
6 #include "rgw_zone.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_sync_counters.h"
10
11 #include "services/svc_zone.h"
12 #include "services/svc_zone_utils.h"
13 #include "services/svc_sys_obj.h"
14
15 #include "cls/lock/cls_lock_client.h"
16 #include "cls/rgw/cls_rgw_client.h"
17
18 #include <boost/asio/yield.hpp>
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
22
23 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
24 if (processor->is_going_down()) {
25 return false;
26 }
27 req->get();
28 processor->m_req_queue.push_back(req);
29 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
30 _dump_queue();
31 return true;
32 }
33
34 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
35 return processor->m_req_queue.empty();
36 }
37
38 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
39 if (processor->m_req_queue.empty())
40 return NULL;
41 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
42 processor->m_req_queue.pop_front();
43 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
44 _dump_queue();
45 return req;
46 }
47
48 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
49 processor->handle_request(req);
50 processor->req_throttle.put(1);
51 }
52
53 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
54 if (!g_conf()->subsys.should_gather<ceph_subsys_rgw, 20>()) {
55 return;
56 }
57 deque<RGWAsyncRadosRequest *>::iterator iter;
58 if (processor->m_req_queue.empty()) {
59 dout(20) << "RGWWQ: empty" << dendl;
60 return;
61 }
62 dout(20) << "RGWWQ:" << dendl;
63 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
64 dout(20) << "req: " << hex << *iter << dec << dendl;
65 }
66 }
67
68 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
69 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
70 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
71 req_wq(this, g_conf()->rgw_op_thread_timeout,
72 g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
73 }
74
75 void RGWAsyncRadosProcessor::start() {
76 m_tp.start();
77 }
78
79 void RGWAsyncRadosProcessor::stop() {
80 going_down = true;
81 m_tp.drain(&req_wq);
82 m_tp.stop();
83 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
84 (*iter)->put();
85 }
86 }
87
88 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
89 req->send_request();
90 req->put();
91 }
92
93 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
94 req_throttle.get(1);
95 req_wq.queue(req);
96 }
97
98 int RGWAsyncGetSystemObj::_send_request()
99 {
100 map<string, bufferlist> *pattrs = want_attrs ? &attrs : nullptr;
101
102 auto sysobj = obj_ctx.get_obj(obj);
103 return sysobj.rop()
104 .set_objv_tracker(&objv_tracker)
105 .set_attrs(pattrs)
106 .set_raw_attrs(raw_attrs)
107 .read(&bl);
108 }
109
110 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
111 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
112 bool want_attrs, bool raw_attrs)
113 : RGWAsyncRadosRequest(caller, cn), obj_ctx(_svc),
114 obj(_obj), want_attrs(want_attrs), raw_attrs(raw_attrs)
115 {
116 if (_objv_tracker) {
117 objv_tracker = *_objv_tracker;
118 }
119 }
120
121 int RGWSimpleRadosReadAttrsCR::send_request()
122 {
123 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
124 svc, nullptr, obj, true, raw_attrs);
125 async_rados->queue(req);
126 return 0;
127 }
128
129 int RGWSimpleRadosReadAttrsCR::request_complete()
130 {
131 if (pattrs) {
132 *pattrs = std::move(req->attrs);
133 }
134 return req->get_ret_status();
135 }
136
137 int RGWAsyncPutSystemObj::_send_request()
138 {
139 auto obj_ctx = svc->init_obj_ctx();
140 auto sysobj = obj_ctx.get_obj(obj);
141 return sysobj.wop()
142 .set_objv_tracker(&objv_tracker)
143 .set_exclusive(exclusive)
144 .write_data(bl);
145 }
146
147 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
148 RGWSI_SysObj *_svc,
149 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
150 bool _exclusive, bufferlist _bl)
151 : RGWAsyncRadosRequest(caller, cn), svc(_svc),
152 obj(_obj), exclusive(_exclusive), bl(std::move(_bl))
153 {
154 if (_objv_tracker) {
155 objv_tracker = *_objv_tracker;
156 }
157 }
158
159 int RGWAsyncPutSystemObjAttrs::_send_request()
160 {
161 auto obj_ctx = svc->init_obj_ctx();
162 auto sysobj = obj_ctx.get_obj(obj);
163 return sysobj.wop()
164 .set_objv_tracker(&objv_tracker)
165 .set_exclusive(false)
166 .set_attrs(attrs)
167 .write_attrs();
168 }
169
170 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
171 RGWSI_SysObj *_svc,
172 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
173 map<string, bufferlist> _attrs)
174 : RGWAsyncRadosRequest(caller, cn), svc(_svc),
175 obj(_obj), attrs(std::move(_attrs))
176 {
177 if (_objv_tracker) {
178 objv_tracker = *_objv_tracker;
179 }
180 }
181
182
183 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
184 uint64_t _window_size)
185 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
186 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
187 {
188 }
189
190 int RGWAsyncLockSystemObj::_send_request()
191 {
192 rgw_rados_ref ref;
193 int r = store->get_raw_obj_ref(obj, &ref);
194 if (r < 0) {
195 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
196 return r;
197 }
198
199 rados::cls::lock::Lock l(lock_name);
200 utime_t duration(duration_secs, 0);
201 l.set_duration(duration);
202 l.set_cookie(cookie);
203 l.set_may_renew(true);
204
205 return l.lock_exclusive(&ref.ioctx, ref.obj.oid);
206 }
207
208 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
209 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
210 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
211 obj(_obj),
212 lock_name(_name),
213 cookie(_cookie),
214 duration_secs(_duration_secs)
215 {
216 }
217
218 int RGWAsyncUnlockSystemObj::_send_request()
219 {
220 rgw_rados_ref ref;
221 int r = store->get_raw_obj_ref(obj, &ref);
222 if (r < 0) {
223 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
224 return r;
225 }
226
227 rados::cls::lock::Lock l(lock_name);
228
229 l.set_cookie(cookie);
230
231 return l.unlock(&ref.ioctx, ref.obj.oid);
232 }
233
234 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
235 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
236 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
237 obj(_obj),
238 lock_name(_name), cookie(_cookie)
239 {
240 }
241
242 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
243 const rgw_raw_obj& _obj,
244 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
245 store(_store),
246 entries(_entries),
247 obj(_obj), cn(NULL)
248 {
249 stringstream& s = set_description();
250 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
251 for (auto i = entries.begin(); i != entries.end(); ++i) {
252 if (i != entries.begin()) {
253 s << ", ";
254 }
255 s << i->first;
256 }
257 s << "]";
258 }
259
260 int RGWRadosSetOmapKeysCR::send_request()
261 {
262 int r = store->get_raw_obj_ref(obj, &ref);
263 if (r < 0) {
264 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
265 return r;
266 }
267
268 set_status() << "sending request";
269
270 librados::ObjectWriteOperation op;
271 op.omap_set(entries);
272
273 cn = stack->create_completion_notifier();
274 return ref.ioctx.aio_operate(ref.obj.oid, cn->completion(), &op);
275 }
276
277 int RGWRadosSetOmapKeysCR::request_complete()
278 {
279 int r = cn->completion()->get_return_value();
280
281 set_status() << "request complete; ret=" << r;
282
283 return r;
284 }
285
286 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
287 const rgw_raw_obj& _obj,
288 const string& _marker,
289 int _max_entries,
290 ResultPtr _result)
291 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
292 marker(_marker), max_entries(_max_entries),
293 result(std::move(_result))
294 {
295 ceph_assert(result); // must be allocated
296 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
297 }
298
299 int RGWRadosGetOmapKeysCR::send_request() {
300 int r = store->get_raw_obj_ref(obj, &result->ref);
301 if (r < 0) {
302 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
303 return r;
304 }
305
306 set_status() << "send request";
307
308 librados::ObjectReadOperation op;
309 op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
310
311 cn = stack->create_completion_notifier(result);
312 return result->ref.ioctx.aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
313 }
314
315 int RGWRadosGetOmapKeysCR::request_complete()
316 {
317 int r = cn->completion()->get_return_value();
318
319 set_status() << "request complete; ret=" << r;
320
321 return r;
322 }
323
324 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
325 const rgw_raw_obj& _obj,
326 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
327 store(_store),
328 keys(_keys),
329 obj(_obj), cn(NULL)
330 {
331 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
332 }
333
334 int RGWRadosRemoveOmapKeysCR::send_request() {
335 int r = store->get_raw_obj_ref(obj, &ref);
336 if (r < 0) {
337 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
338 return r;
339 }
340
341 set_status() << "send request";
342
343 librados::ObjectWriteOperation op;
344 op.omap_rm_keys(keys);
345
346 cn = stack->create_completion_notifier();
347 return ref.ioctx.aio_operate(ref.obj.oid, cn->completion(), &op);
348 }
349
350 int RGWRadosRemoveOmapKeysCR::request_complete()
351 {
352 int r = cn->completion()->get_return_value();
353
354 set_status() << "request complete; ret=" << r;
355
356 return r;
357 }
358
359 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
360 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
361 {
362 set_description() << "remove dest=" << obj;
363 }
364
365 int RGWRadosRemoveCR::send_request()
366 {
367 auto rados = store->get_rados_handle();
368 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
369 if (r < 0) {
370 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
371 return r;
372 }
373 ioctx.locator_set_key(obj.loc);
374
375 set_status() << "send request";
376
377 librados::ObjectWriteOperation op;
378 op.remove();
379
380 cn = stack->create_completion_notifier();
381 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
382 }
383
384 int RGWRadosRemoveCR::request_complete()
385 {
386 int r = cn->completion()->get_return_value();
387
388 set_status() << "request complete; ret=" << r;
389
390 return r;
391 }
392
393 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
394 const rgw_raw_obj& _obj,
395 const string& _lock_name,
396 const string& _cookie,
397 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
398 async_rados(_async_rados),
399 store(_store),
400 lock_name(_lock_name),
401 cookie(_cookie),
402 duration(_duration),
403 obj(_obj),
404 req(NULL)
405 {
406 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
407 }
408
409 void RGWSimpleRadosLockCR::request_cleanup()
410 {
411 if (req) {
412 req->finish();
413 req = NULL;
414 }
415 }
416
417 int RGWSimpleRadosLockCR::send_request()
418 {
419 set_status() << "sending request";
420 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
421 store, NULL, obj, lock_name, cookie, duration);
422 async_rados->queue(req);
423 return 0;
424 }
425
426 int RGWSimpleRadosLockCR::request_complete()
427 {
428 set_status() << "request complete; ret=" << req->get_ret_status();
429 return req->get_ret_status();
430 }
431
432 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
433 const rgw_raw_obj& _obj,
434 const string& _lock_name,
435 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
436 async_rados(_async_rados),
437 store(_store),
438 lock_name(_lock_name),
439 cookie(_cookie),
440 obj(_obj),
441 req(NULL)
442 {
443 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
444 }
445
446 void RGWSimpleRadosUnlockCR::request_cleanup()
447 {
448 if (req) {
449 req->finish();
450 req = NULL;
451 }
452 }
453
454 int RGWSimpleRadosUnlockCR::send_request()
455 {
456 set_status() << "sending request";
457
458 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
459 store, NULL, obj, lock_name, cookie);
460 async_rados->queue(req);
461 return 0;
462 }
463
464 int RGWSimpleRadosUnlockCR::request_complete()
465 {
466 set_status() << "request complete; ret=" << req->get_ret_status();
467 return req->get_ret_status();
468 }
469
470 int RGWOmapAppend::operate() {
471 reenter(this) {
472 for (;;) {
473 if (!has_product() && going_down) {
474 set_status() << "going down";
475 break;
476 }
477 set_status() << "waiting for product";
478 yield wait_for_product();
479 yield {
480 string entry;
481 while (consume(&entry)) {
482 set_status() << "adding entry: " << entry;
483 entries[entry] = bufferlist();
484 if (entries.size() >= window_size) {
485 break;
486 }
487 }
488 if (entries.size() >= window_size || going_down) {
489 set_status() << "flushing to omap";
490 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
491 entries.clear();
492 }
493 }
494 if (get_ret_status() < 0) {
495 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
496 return set_state(RGWCoroutine_Error);
497 }
498 }
499 /* done with coroutine */
500 return set_state(RGWCoroutine_Done);
501 }
502 return 0;
503 }
504
505 void RGWOmapAppend::flush_pending() {
506 receive(pending_entries);
507 num_pending_entries = 0;
508 }
509
510 bool RGWOmapAppend::append(const string& s) {
511 if (is_done()) {
512 return false;
513 }
514 ++total_entries;
515 pending_entries.push_back(s);
516 if (++num_pending_entries >= (int)window_size) {
517 flush_pending();
518 }
519 return true;
520 }
521
522 bool RGWOmapAppend::finish() {
523 going_down = true;
524 flush_pending();
525 set_sleeping(false);
526 return (!is_done());
527 }
528
529 int RGWAsyncGetBucketInstanceInfo::_send_request()
530 {
531 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
532 int r = store->get_bucket_instance_from_oid(obj_ctx, oid, bucket_info, NULL, NULL);
533 if (r < 0) {
534 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
535 << oid << dendl;
536 return r;
537 }
538
539 return 0;
540 }
541
542 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(RGWRados *store,
543 const RGWBucketInfo& bucket_info,
544 int shard_id,
545 const std::string& start_marker,
546 const std::string& end_marker)
547 : RGWSimpleCoroutine(store->ctx()), bs(store),
548 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
549 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
550 {
551 bs.init(bucket_info, shard_id);
552 }
553
554 int RGWRadosBILogTrimCR::send_request()
555 {
556 bufferlist in;
557 cls_rgw_bi_log_trim_op call;
558 call.start_marker = std::move(start_marker);
559 call.end_marker = std::move(end_marker);
560 encode(call, in);
561
562 librados::ObjectWriteOperation op;
563 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
564
565 cn = stack->create_completion_notifier();
566 return bs.index_ctx.aio_operate(bs.bucket_obj, cn->completion(), &op);
567 }
568
569 int RGWRadosBILogTrimCR::request_complete()
570 {
571 int r = cn->completion()->get_return_value();
572 set_status() << "request complete; ret=" << r;
573 return r;
574 }
575
576 int RGWAsyncFetchRemoteObj::_send_request()
577 {
578 RGWObjectCtx obj_ctx(store);
579
580 string user_id;
581 char buf[16];
582 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
583 map<string, bufferlist> attrs;
584
585 rgw_obj src_obj(bucket_info.bucket, key);
586
587 rgw_obj dest_obj(bucket_info.bucket, dest_key.value_or(key));
588
589 std::optional<uint64_t> bytes_transferred;
590 int r = store->fetch_remote_obj(obj_ctx,
591 user_id,
592 NULL, /* req_info */
593 source_zone,
594 dest_obj,
595 src_obj,
596 bucket_info, /* dest */
597 bucket_info, /* source */
598 dest_placement_rule,
599 NULL, /* real_time* src_mtime, */
600 NULL, /* real_time* mtime, */
601 NULL, /* const real_time* mod_ptr, */
602 NULL, /* const real_time* unmod_ptr, */
603 false, /* high precision time */
604 NULL, /* const char *if_match, */
605 NULL, /* const char *if_nomatch, */
606 RGWRados::ATTRSMOD_NONE,
607 copy_if_newer,
608 attrs,
609 RGWObjCategory::Main,
610 versioned_epoch,
611 real_time(), /* delete_at */
612 NULL, /* string *ptag, */
613 NULL, /* string *petag, */
614 NULL, /* void (*progress_cb)(off_t, void *), */
615 NULL, /* void *progress_data*); */
616 &zones_trace,
617 &bytes_transferred);
618
619 if (r < 0) {
620 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
621 if (counters) {
622 counters->inc(sync_counters::l_fetch_err, 1);
623 }
624 } else if (counters) {
625 if (bytes_transferred) {
626 counters->inc(sync_counters::l_fetch, *bytes_transferred);
627 } else {
628 counters->inc(sync_counters::l_fetch_not_modified);
629 }
630 }
631 return r;
632 }
633
634 int RGWAsyncStatRemoteObj::_send_request()
635 {
636 RGWObjectCtx obj_ctx(store);
637
638 string user_id;
639 char buf[16];
640 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
641
642 rgw_obj src_obj(bucket_info.bucket, key);
643
644 rgw_obj dest_obj(src_obj);
645
646 int r = store->stat_remote_obj(obj_ctx,
647 user_id,
648 nullptr, /* req_info */
649 source_zone,
650 src_obj,
651 bucket_info, /* source */
652 pmtime, /* real_time* src_mtime, */
653 psize, /* uint64_t * */
654 nullptr, /* const real_time* mod_ptr, */
655 nullptr, /* const real_time* unmod_ptr, */
656 true, /* high precision time */
657 nullptr, /* const char *if_match, */
658 nullptr, /* const char *if_nomatch, */
659 pattrs,
660 pheaders,
661 nullptr,
662 nullptr, /* string *ptag, */
663 petag); /* string *petag, */
664
665 if (r < 0) {
666 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
667 }
668 return r;
669 }
670
671
672 int RGWAsyncRemoveObj::_send_request()
673 {
674 RGWObjectCtx obj_ctx(store);
675
676 rgw_obj obj(bucket_info.bucket, key);
677
678 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
679
680 obj_ctx.set_atomic(obj);
681
682 RGWObjState *state;
683
684 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
685 if (ret < 0) {
686 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
687 return ret;
688 }
689
690 /* has there been any racing object write? */
691 if (del_if_older && (state->mtime > timestamp)) {
692 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
693 return 0;
694 }
695
696 RGWAccessControlPolicy policy;
697
698 /* decode policy */
699 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
700 if (iter != state->attrset.end()) {
701 auto bliter = iter->second.cbegin();
702 try {
703 policy.decode(bliter);
704 } catch (buffer::error& err) {
705 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
706 return -EIO;
707 }
708 }
709
710 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
711 RGWRados::Object::Delete del_op(&del_target);
712
713 del_op.params.bucket_owner = bucket_info.owner;
714 del_op.params.obj_owner = policy.get_owner();
715 if (del_if_older) {
716 del_op.params.unmod_since = timestamp;
717 }
718 if (versioned) {
719 del_op.params.versioning_status = BUCKET_VERSIONED;
720 }
721 del_op.params.olh_epoch = versioned_epoch;
722 del_op.params.marker_version_id = marker_version_id;
723 del_op.params.obj_owner.set_id(owner);
724 del_op.params.obj_owner.set_name(owner_display_name);
725 del_op.params.mtime = timestamp;
726 del_op.params.high_precision_time = true;
727 del_op.params.zones_trace = &zones_trace;
728
729 ret = del_op.delete_obj();
730 if (ret < 0) {
731 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
732 }
733 return ret;
734 }
735
736 int RGWContinuousLeaseCR::operate()
737 {
738 if (aborted) {
739 caller->set_sleeping(false);
740 return set_cr_done();
741 }
742 reenter(this) {
743 while (!going_down) {
744 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
745
746 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
747 if (retcode < 0) {
748 set_locked(false);
749 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
750 return set_state(RGWCoroutine_Error, retcode);
751 }
752 set_locked(true);
753 yield wait(utime_t(interval / 2, 0));
754 }
755 set_locked(false); /* moot at this point anyway */
756 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
757 return set_state(RGWCoroutine_Done);
758 }
759 return 0;
760 }
761
762 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
763 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
764 store(_store),
765 oid(_oid), cn(NULL)
766 {
767 stringstream& s = set_description();
768 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
769 entries.push_back(entry);
770 }
771
772 int RGWRadosTimelogAddCR::send_request()
773 {
774 set_status() << "sending request";
775
776 cn = stack->create_completion_notifier();
777 return store->time_log_add(oid, entries, cn->completion(), true);
778 }
779
780 int RGWRadosTimelogAddCR::request_complete()
781 {
782 int r = cn->completion()->get_return_value();
783
784 set_status() << "request complete; ret=" << r;
785
786 return r;
787 }
788
789 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
790 const std::string& oid,
791 const real_time& start_time,
792 const real_time& end_time,
793 const std::string& from_marker,
794 const std::string& to_marker)
795 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
796 start_time(start_time), end_time(end_time),
797 from_marker(from_marker), to_marker(to_marker)
798 {
799 set_description() << "timelog trim oid=" << oid
800 << " start_time=" << start_time << " end_time=" << end_time
801 << " from_marker=" << from_marker << " to_marker=" << to_marker;
802 }
803
804 int RGWRadosTimelogTrimCR::send_request()
805 {
806 set_status() << "sending request";
807
808 cn = stack->create_completion_notifier();
809 return store->time_log_trim(oid, start_time, end_time, from_marker,
810 to_marker, cn->completion());
811 }
812
813 int RGWRadosTimelogTrimCR::request_complete()
814 {
815 int r = cn->completion()->get_return_value();
816
817 set_status() << "request complete; ret=" << r;
818
819 return r;
820 }
821
822
823 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
824 const std::string& to_marker,
825 std::string *last_trim_marker)
826 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
827 std::string{}, to_marker),
828 cct(store->ctx()), last_trim_marker(last_trim_marker)
829 {
830 }
831
832 int RGWSyncLogTrimCR::request_complete()
833 {
834 int r = RGWRadosTimelogTrimCR::request_complete();
835 if (r != -ENODATA) {
836 return r;
837 }
838 // nothing left to trim, update last_trim_marker
839 if (*last_trim_marker < to_marker) {
840 *last_trim_marker = to_marker;
841 }
842 return 0;
843 }
844
845
846 int RGWAsyncStatObj::_send_request()
847 {
848 rgw_raw_obj raw_obj;
849 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
850 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
851 nullptr, nullptr, objv_tracker);
852 }
853
854 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
855 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
856 real_time* pmtime, uint64_t *pepoch,
857 RGWObjVersionTracker *objv_tracker)
858 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
859 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
860 objv_tracker(objv_tracker)
861 {
862 }
863
864 void RGWStatObjCR::request_cleanup()
865 {
866 if (req) {
867 req->finish();
868 req = NULL;
869 }
870 }
871
872 int RGWStatObjCR::send_request()
873 {
874 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
875 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
876 async_rados->queue(req);
877 return 0;
878 }
879
880 int RGWStatObjCR::request_complete()
881 {
882 return req->get_ret_status();
883 }
884
885 RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
886 bufferlist& request, uint64_t timeout_ms,
887 bufferlist *response)
888 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
889 request(request), timeout_ms(timeout_ms), response(response)
890 {
891 set_description() << "notify dest=" << obj;
892 }
893
894 int RGWRadosNotifyCR::send_request()
895 {
896 int r = store->get_raw_obj_ref(obj, &ref);
897 if (r < 0) {
898 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
899 return r;
900 }
901
902 set_status() << "sending request";
903
904 cn = stack->create_completion_notifier();
905 return ref.ioctx.aio_notify(ref.obj.oid, cn->completion(), request,
906 timeout_ms, response);
907 }
908
909 int RGWRadosNotifyCR::request_complete()
910 {
911 int r = cn->completion()->get_return_value();
912
913 set_status() << "request complete; ret=" << r;
914
915 return r;
916 }