]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.cc
21f4a00f47a68f9fa0af5620cf22b1772fdfb9cd
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
1 #include "rgw_rados.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_cr_rados.h"
4
5 #include "cls/lock/cls_lock_client.h"
6
7 #include <boost/asio/yield.hpp>
8
9 #define dout_context g_ceph_context
10 #define dout_subsys ceph_subsys_rgw
11
12 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
13 if (processor->is_going_down()) {
14 return false;
15 }
16 req->get();
17 processor->m_req_queue.push_back(req);
18 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
19 _dump_queue();
20 return true;
21 }
22
23 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
24 return processor->m_req_queue.empty();
25 }
26
27 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
28 if (processor->m_req_queue.empty())
29 return NULL;
30 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
31 processor->m_req_queue.pop_front();
32 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
33 _dump_queue();
34 return req;
35 }
36
37 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
38 processor->handle_request(req);
39 processor->req_throttle.put(1);
40 }
41
42 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
43 if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
44 return;
45 }
46 deque<RGWAsyncRadosRequest *>::iterator iter;
47 if (processor->m_req_queue.empty()) {
48 dout(20) << "RGWWQ: empty" << dendl;
49 return;
50 }
51 dout(20) << "RGWWQ:" << dendl;
52 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
53 dout(20) << "req: " << hex << *iter << dec << dendl;
54 }
55 }
56
57 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
58 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
59 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
60 req_wq(this, g_conf->rgw_op_thread_timeout,
61 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
62 }
63
64 void RGWAsyncRadosProcessor::start() {
65 m_tp.start();
66 }
67
68 void RGWAsyncRadosProcessor::stop() {
69 going_down = true;
70 m_tp.drain(&req_wq);
71 m_tp.stop();
72 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
73 (*iter)->put();
74 }
75 }
76
77 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
78 req->send_request();
79 req->put();
80 }
81
82 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
83 req_throttle.get(1);
84 req_wq.queue(req);
85 }
86
87 int RGWAsyncGetSystemObj::_send_request()
88 {
89 return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, pattrs, NULL);
90 }
91
92 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
93 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
94 bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(caller, cn), store(_store), obj_ctx(_obj_ctx),
95 objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), pattrs(NULL),
96 ofs(_ofs), end(_end)
97 {
98 }
99
100 int RGWSimpleRadosReadAttrsCR::send_request()
101 {
102 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
103 store, &obj_ctx, NULL,
104 obj,
105 &bl, 0, -1);
106 if (pattrs) {
107 req->set_read_attrs(pattrs);
108 }
109 async_rados->queue(req);
110 return 0;
111 }
112
113 int RGWSimpleRadosReadAttrsCR::request_complete()
114 {
115 return req->get_ret_status();
116 }
117
118 int RGWAsyncPutSystemObj::_send_request()
119 {
120 return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
121 }
122
123 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
124 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
125 bool _exclusive, bufferlist& _bl)
126 : RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
127 obj(_obj), exclusive(_exclusive), bl(_bl)
128 {
129 }
130
131 int RGWAsyncPutSystemObjAttrs::_send_request()
132 {
133 return store->system_obj_set_attrs(NULL, obj, *attrs, NULL, objv_tracker);
134 }
135
136 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
137 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
138 map<string, bufferlist> *_attrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
139 objv_tracker(_objv_tracker), obj(_obj),
140 attrs(_attrs)
141 {
142 }
143
144
145 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
146 uint64_t _window_size)
147 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
148 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
149 {
150 }
151
152 int RGWAsyncLockSystemObj::_send_request()
153 {
154 rgw_rados_ref ref;
155 int r = store->get_raw_obj_ref(obj, &ref);
156 if (r < 0) {
157 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
158 return r;
159 }
160
161 rados::cls::lock::Lock l(lock_name);
162 utime_t duration(duration_secs, 0);
163 l.set_duration(duration);
164 l.set_cookie(cookie);
165 l.set_renew(true);
166
167 return l.lock_exclusive(&ref.ioctx, ref.oid);
168 }
169
170 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
171 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
172 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
173 obj(_obj),
174 lock_name(_name),
175 cookie(_cookie),
176 duration_secs(_duration_secs)
177 {
178 }
179
180 int RGWAsyncUnlockSystemObj::_send_request()
181 {
182 rgw_rados_ref ref;
183 int r = store->get_raw_obj_ref(obj, &ref);
184 if (r < 0) {
185 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
186 return r;
187 }
188
189 rados::cls::lock::Lock l(lock_name);
190
191 l.set_cookie(cookie);
192
193 return l.unlock(&ref.ioctx, ref.oid);
194 }
195
196 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
197 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
198 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
199 obj(_obj),
200 lock_name(_name), cookie(_cookie)
201 {
202 }
203
204
205 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
206 const rgw_raw_obj& _obj,
207 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
208 store(_store),
209 entries(_entries),
210 obj(_obj), cn(NULL)
211 {
212 stringstream& s = set_description();
213 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
214 for (auto i = entries.begin(); i != entries.end(); ++i) {
215 if (i != entries.begin()) {
216 s << ", ";
217 }
218 s << i->first;
219 }
220 s << "]";
221 }
222
223 RGWRadosSetOmapKeysCR::~RGWRadosSetOmapKeysCR()
224 {
225 if (cn) {
226 cn->put();
227 }
228 }
229
230 int RGWRadosSetOmapKeysCR::send_request()
231 {
232 int r = store->get_raw_obj_ref(obj, &ref);
233 if (r < 0) {
234 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
235 return r;
236 }
237
238 set_status() << "sending request";
239
240 librados::ObjectWriteOperation op;
241 op.omap_set(entries);
242
243 cn = stack->create_completion_notifier();
244 cn->get();
245 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
246 }
247
248 int RGWRadosSetOmapKeysCR::request_complete()
249 {
250 int r = cn->completion()->get_return_value();
251
252 set_status() << "request complete; ret=" << r;
253
254 return r;
255 }
256
257 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
258 const rgw_raw_obj& _obj,
259 const string& _marker,
260 map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
261 store(_store),
262 marker(_marker),
263 entries(_entries), max_entries(_max_entries), rval(0),
264 obj(_obj), cn(NULL)
265 {
266 set_description() << "set omap keys dest=" << obj << " marker=" << marker;
267 }
268
269 RGWRadosGetOmapKeysCR::~RGWRadosGetOmapKeysCR()
270 {
271 }
272
273 int RGWRadosGetOmapKeysCR::send_request() {
274 int r = store->get_raw_obj_ref(obj, &ref);
275 if (r < 0) {
276 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
277 return r;
278 }
279
280 set_status() << "send request";
281
282 librados::ObjectReadOperation op;
283 op.omap_get_vals2(marker, max_entries, entries, nullptr, &rval);
284
285 cn = stack->create_completion_notifier();
286 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
287 }
288
289 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
290 const rgw_raw_obj& _obj,
291 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
292 store(_store),
293 keys(_keys),
294 obj(_obj), cn(NULL)
295 {
296 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
297 }
298
299 RGWRadosRemoveOmapKeysCR::~RGWRadosRemoveOmapKeysCR()
300 {
301 }
302
303 int RGWRadosRemoveOmapKeysCR::send_request() {
304 int r = store->get_raw_obj_ref(obj, &ref);
305 if (r < 0) {
306 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
307 return r;
308 }
309
310 set_status() << "send request";
311
312 librados::ObjectWriteOperation op;
313 op.omap_rm_keys(keys);
314
315 cn = stack->create_completion_notifier();
316 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
317 }
318
319 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
320 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
321 {
322 set_description() << "remove dest=" << obj;
323 }
324
325 int RGWRadosRemoveCR::send_request()
326 {
327 auto rados = store->get_rados_handle();
328 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
329 if (r < 0) {
330 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
331 return r;
332 }
333 ioctx.locator_set_key(obj.loc);
334
335 set_status() << "send request";
336
337 librados::ObjectWriteOperation op;
338 op.remove();
339
340 cn = stack->create_completion_notifier();
341 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
342 }
343
344 int RGWRadosRemoveCR::request_complete()
345 {
346 int r = cn->completion()->get_return_value();
347
348 set_status() << "request complete; ret=" << r;
349
350 return r;
351 }
352
353 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
354 const rgw_raw_obj& _obj,
355 const string& _lock_name,
356 const string& _cookie,
357 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
358 async_rados(_async_rados),
359 store(_store),
360 lock_name(_lock_name),
361 cookie(_cookie),
362 duration(_duration),
363 obj(_obj),
364 req(NULL)
365 {
366 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
367 }
368
369 void RGWSimpleRadosLockCR::request_cleanup()
370 {
371 if (req) {
372 req->finish();
373 req = NULL;
374 }
375 }
376
377 int RGWSimpleRadosLockCR::send_request()
378 {
379 set_status() << "sending request";
380 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
381 store, NULL, obj, lock_name, cookie, duration);
382 async_rados->queue(req);
383 return 0;
384 }
385
386 int RGWSimpleRadosLockCR::request_complete()
387 {
388 set_status() << "request complete; ret=" << req->get_ret_status();
389 return req->get_ret_status();
390 }
391
392 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
393 const rgw_raw_obj& _obj,
394 const string& _lock_name,
395 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
396 async_rados(_async_rados),
397 store(_store),
398 lock_name(_lock_name),
399 cookie(_cookie),
400 obj(_obj),
401 req(NULL)
402 {
403 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
404 }
405
406 void RGWSimpleRadosUnlockCR::request_cleanup()
407 {
408 if (req) {
409 req->finish();
410 req = NULL;
411 }
412 }
413
414 int RGWSimpleRadosUnlockCR::send_request()
415 {
416 set_status() << "sending request";
417
418 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
419 store, NULL, obj, lock_name, cookie);
420 async_rados->queue(req);
421 return 0;
422 }
423
424 int RGWSimpleRadosUnlockCR::request_complete()
425 {
426 set_status() << "request complete; ret=" << req->get_ret_status();
427 return req->get_ret_status();
428 }
429
430
431 int RGWOmapAppend::operate() {
432 reenter(this) {
433 for (;;) {
434 if (!has_product() && going_down) {
435 set_status() << "going down";
436 break;
437 }
438 set_status() << "waiting for product";
439 yield wait_for_product();
440 yield {
441 string entry;
442 while (consume(&entry)) {
443 set_status() << "adding entry: " << entry;
444 entries[entry] = bufferlist();
445 if (entries.size() >= window_size) {
446 break;
447 }
448 }
449 if (entries.size() >= window_size || going_down) {
450 set_status() << "flushing to omap";
451 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
452 entries.clear();
453 }
454 }
455 if (get_ret_status() < 0) {
456 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
457 return set_state(RGWCoroutine_Error);
458 }
459 }
460 /* done with coroutine */
461 return set_state(RGWCoroutine_Done);
462 }
463 return 0;
464 }
465
466 void RGWOmapAppend::flush_pending() {
467 receive(pending_entries);
468 num_pending_entries = 0;
469 }
470
471 bool RGWOmapAppend::append(const string& s) {
472 if (is_done()) {
473 return false;
474 }
475 ++total_entries;
476 pending_entries.push_back(s);
477 if (++num_pending_entries >= (int)window_size) {
478 flush_pending();
479 }
480 return true;
481 }
482
483 bool RGWOmapAppend::finish() {
484 going_down = true;
485 flush_pending();
486 set_sleeping(false);
487 return (!is_done());
488 }
489
490 int RGWAsyncGetBucketInstanceInfo::_send_request()
491 {
492 RGWObjectCtx obj_ctx(store);
493 int r = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info, NULL, NULL);
494 if (r < 0) {
495 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
496 << bucket << dendl;
497 return r;
498 }
499
500 return 0;
501 }
502
503 int RGWAsyncFetchRemoteObj::_send_request()
504 {
505 RGWObjectCtx obj_ctx(store);
506
507 string user_id;
508 char buf[16];
509 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
510 string client_id = store->zone_id() + buf;
511 string op_id = store->unique_id(store->get_new_req_id());
512 map<string, bufferlist> attrs;
513
514 rgw_obj src_obj(bucket_info.bucket, key);
515
516 rgw_obj dest_obj(src_obj);
517
518 int r = store->fetch_remote_obj(obj_ctx,
519 user_id,
520 client_id,
521 op_id,
522 false, /* don't record op state in ops log */
523 NULL, /* req_info */
524 source_zone,
525 dest_obj,
526 src_obj,
527 bucket_info, /* dest */
528 bucket_info, /* source */
529 NULL, /* real_time* src_mtime, */
530 NULL, /* real_time* mtime, */
531 NULL, /* const real_time* mod_ptr, */
532 NULL, /* const real_time* unmod_ptr, */
533 false, /* high precision time */
534 NULL, /* const char *if_match, */
535 NULL, /* const char *if_nomatch, */
536 RGWRados::ATTRSMOD_NONE,
537 copy_if_newer,
538 attrs,
539 RGW_OBJ_CATEGORY_MAIN,
540 versioned_epoch,
541 real_time(), /* delete_at */
542 &key.instance, /* string *version_id, */
543 NULL, /* string *ptag, */
544 NULL, /* string *petag, */
545 NULL, /* void (*progress_cb)(off_t, void *), */
546 NULL, /* void *progress_data*); */
547 zones_trace);
548
549 if (r < 0) {
550 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
551 }
552 return r;
553 }
554
555 int RGWAsyncStatRemoteObj::_send_request()
556 {
557 RGWObjectCtx obj_ctx(store);
558
559 string user_id;
560 char buf[16];
561 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
562 string client_id = store->zone_id() + buf;
563 string op_id = store->unique_id(store->get_new_req_id());
564
565 rgw_obj src_obj(bucket_info.bucket, key);
566
567 rgw_obj dest_obj(src_obj);
568
569 int r = store->stat_remote_obj(obj_ctx,
570 user_id,
571 client_id,
572 nullptr, /* req_info */
573 source_zone,
574 src_obj,
575 bucket_info, /* source */
576 pmtime, /* real_time* src_mtime, */
577 psize, /* uint64_t * */
578 nullptr, /* const real_time* mod_ptr, */
579 nullptr, /* const real_time* unmod_ptr, */
580 true, /* high precision time */
581 nullptr, /* const char *if_match, */
582 nullptr, /* const char *if_nomatch, */
583 pattrs,
584 nullptr,
585 nullptr, /* string *ptag, */
586 nullptr); /* string *petag, */
587
588 if (r < 0) {
589 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
590 }
591 return r;
592 }
593
594
595 int RGWAsyncRemoveObj::_send_request()
596 {
597 RGWObjectCtx obj_ctx(store);
598
599 rgw_obj obj(bucket_info.bucket, key);
600
601 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
602
603 obj_ctx.obj.set_atomic(obj);
604
605 RGWObjState *state;
606
607 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
608 if (ret < 0) {
609 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
610 return ret;
611 }
612
613 /* has there been any racing object write? */
614 if (del_if_older && (state->mtime > timestamp)) {
615 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
616 return 0;
617 }
618
619 RGWAccessControlPolicy policy;
620
621 /* decode policy */
622 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
623 if (iter != state->attrset.end()) {
624 bufferlist::iterator bliter = iter->second.begin();
625 try {
626 policy.decode(bliter);
627 } catch (buffer::error& err) {
628 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
629 return -EIO;
630 }
631 }
632
633 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
634 RGWRados::Object::Delete del_op(&del_target);
635
636 del_op.params.bucket_owner = bucket_info.owner;
637 del_op.params.obj_owner = policy.get_owner();
638 if (del_if_older) {
639 del_op.params.unmod_since = timestamp;
640 }
641 if (versioned) {
642 del_op.params.versioning_status = BUCKET_VERSIONED;
643 }
644 del_op.params.olh_epoch = versioned_epoch;
645 del_op.params.marker_version_id = marker_version_id;
646 del_op.params.obj_owner.set_id(owner);
647 del_op.params.obj_owner.set_name(owner_display_name);
648 del_op.params.mtime = timestamp;
649 del_op.params.high_precision_time = true;
650 del_op.params.zones_trace = zones_trace;
651
652 ret = del_op.delete_obj();
653 if (ret < 0) {
654 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
655 }
656 return ret;
657 }
658
659 int RGWContinuousLeaseCR::operate()
660 {
661 if (aborted) {
662 caller->set_sleeping(false);
663 return set_cr_done();
664 }
665 reenter(this) {
666 while (!going_down) {
667 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
668
669 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
670 if (retcode < 0) {
671 set_locked(false);
672 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
673 return set_state(RGWCoroutine_Error, retcode);
674 }
675 set_locked(true);
676 yield wait(utime_t(interval / 2, 0));
677 }
678 set_locked(false); /* moot at this point anyway */
679 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
680 return set_state(RGWCoroutine_Done);
681 }
682 return 0;
683 }
684
685 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
686 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
687 store(_store),
688 oid(_oid), cn(NULL)
689 {
690 stringstream& s = set_description();
691 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
692 entries.push_back(entry);
693 }
694
695 RGWRadosTimelogAddCR::~RGWRadosTimelogAddCR()
696 {
697 if (cn) {
698 cn->put();
699 }
700 }
701
702 int RGWRadosTimelogAddCR::send_request()
703 {
704 set_status() << "sending request";
705
706 cn = stack->create_completion_notifier();
707 cn->get();
708 return store->time_log_add(oid, entries, cn->completion(), true);
709 }
710
711 int RGWRadosTimelogAddCR::request_complete()
712 {
713 int r = cn->completion()->get_return_value();
714
715 set_status() << "request complete; ret=" << r;
716
717 return r;
718 }
719
720 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
721 const std::string& oid,
722 const real_time& start_time,
723 const real_time& end_time,
724 const std::string& from_marker,
725 const std::string& to_marker)
726 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
727 start_time(start_time), end_time(end_time),
728 from_marker(from_marker), to_marker(to_marker)
729 {
730 set_description() << "timelog trim oid=" << oid
731 << " start_time=" << start_time << " end_time=" << end_time
732 << " from_marker=" << from_marker << " to_marker=" << to_marker;
733 }
734
735 RGWRadosTimelogTrimCR::~RGWRadosTimelogTrimCR()
736 {
737 if (cn) {
738 cn->put();
739 }
740 }
741
742 int RGWRadosTimelogTrimCR::send_request()
743 {
744 set_status() << "sending request";
745
746 cn = stack->create_completion_notifier();
747 cn->get();
748 return store->time_log_trim(oid, start_time, end_time, from_marker,
749 to_marker, cn->completion());
750 }
751
752 int RGWRadosTimelogTrimCR::request_complete()
753 {
754 int r = cn->completion()->get_return_value();
755
756 set_status() << "request complete; ret=" << r;
757
758 return r;
759 }
760
761
762 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
763 const std::string& to_marker,
764 std::string *last_trim_marker)
765 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
766 std::string{}, to_marker),
767 cct(store->ctx()), last_trim_marker(last_trim_marker)
768 {
769 }
770
771 int RGWSyncLogTrimCR::request_complete()
772 {
773 int r = RGWRadosTimelogTrimCR::request_complete();
774 if (r < 0 && r != -ENODATA) {
775 return r;
776 }
777 if (*last_trim_marker < to_marker) {
778 *last_trim_marker = to_marker;
779 }
780 return 0;
781 }
782
783
784 int RGWAsyncStatObj::_send_request()
785 {
786 rgw_raw_obj raw_obj;
787 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
788 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
789 nullptr, nullptr, objv_tracker);
790 }
791
792 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
793 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
794 real_time* pmtime, uint64_t *pepoch,
795 RGWObjVersionTracker *objv_tracker)
796 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
797 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
798 objv_tracker(objv_tracker)
799 {
800 }
801
802 void RGWStatObjCR::request_cleanup()
803 {
804 if (req) {
805 req->finish();
806 req = NULL;
807 }
808 }
809
810 int RGWStatObjCR::send_request()
811 {
812 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
813 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
814 async_rados->queue(req);
815 return 0;
816 }
817
818 int RGWStatObjCR::request_complete()
819 {
820 return req->get_ret_status();
821 }