]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rados.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
1 #include "rgw_rados.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_boost_asio_yield.h"
4 #include "rgw_cr_rados.h"
5
6 #include "cls/lock/cls_lock_client.h"
7
8 #define dout_context g_ceph_context
9 #define dout_subsys ceph_subsys_rgw
10
11 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
12 if (processor->is_going_down()) {
13 return false;
14 }
15 req->get();
16 processor->m_req_queue.push_back(req);
17 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
18 _dump_queue();
19 return true;
20 }
21
22 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
23 return processor->m_req_queue.empty();
24 }
25
26 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
27 if (processor->m_req_queue.empty())
28 return NULL;
29 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
30 processor->m_req_queue.pop_front();
31 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
32 _dump_queue();
33 return req;
34 }
35
36 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
37 processor->handle_request(req);
38 processor->req_throttle.put(1);
39 }
40
41 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
42 if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
43 return;
44 }
45 deque<RGWAsyncRadosRequest *>::iterator iter;
46 if (processor->m_req_queue.empty()) {
47 dout(20) << "RGWWQ: empty" << dendl;
48 return;
49 }
50 dout(20) << "RGWWQ:" << dendl;
51 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
52 dout(20) << "req: " << hex << *iter << dec << dendl;
53 }
54 }
55
56 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
57 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
58 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
59 req_wq(this, g_conf->rgw_op_thread_timeout,
60 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
61 }
62
63 void RGWAsyncRadosProcessor::start() {
64 m_tp.start();
65 }
66
67 void RGWAsyncRadosProcessor::stop() {
68 going_down = true;
69 m_tp.drain(&req_wq);
70 m_tp.stop();
71 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
72 (*iter)->put();
73 }
74 }
75
76 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
77 req->send_request();
78 req->put();
79 }
80
81 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
82 req_throttle.get(1);
83 req_wq.queue(req);
84 }
85
86 int RGWAsyncGetSystemObj::_send_request()
87 {
88 return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, pattrs, NULL);
89 }
90
91 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
92 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
93 bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(caller, cn), store(_store), obj_ctx(_obj_ctx),
94 objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), pattrs(NULL),
95 ofs(_ofs), end(_end)
96 {
97 }
98
99 int RGWSimpleRadosReadAttrsCR::send_request()
100 {
101 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
102 store, &obj_ctx, NULL,
103 obj,
104 &bl, 0, -1);
105 if (pattrs) {
106 req->set_read_attrs(pattrs);
107 }
108 async_rados->queue(req);
109 return 0;
110 }
111
112 int RGWSimpleRadosReadAttrsCR::request_complete()
113 {
114 return req->get_ret_status();
115 }
116
117 int RGWAsyncPutSystemObj::_send_request()
118 {
119 return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
120 }
121
122 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
123 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
124 bool _exclusive, bufferlist& _bl)
125 : RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
126 obj(_obj), exclusive(_exclusive), bl(_bl)
127 {
128 }
129
130 int RGWAsyncPutSystemObjAttrs::_send_request()
131 {
132 return store->system_obj_set_attrs(NULL, obj, *attrs, NULL, objv_tracker);
133 }
134
135 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
136 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
137 map<string, bufferlist> *_attrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
138 objv_tracker(_objv_tracker), obj(_obj),
139 attrs(_attrs)
140 {
141 }
142
143
144 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
145 uint64_t _window_size)
146 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
147 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
148 {
149 }
150
151 int RGWAsyncLockSystemObj::_send_request()
152 {
153 rgw_rados_ref ref;
154 int r = store->get_raw_obj_ref(obj, &ref);
155 if (r < 0) {
156 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
157 return r;
158 }
159
160 rados::cls::lock::Lock l(lock_name);
161 utime_t duration(duration_secs, 0);
162 l.set_duration(duration);
163 l.set_cookie(cookie);
164 l.set_renew(true);
165
166 return l.lock_exclusive(&ref.ioctx, ref.oid);
167 }
168
169 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
170 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
171 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
172 obj(_obj),
173 lock_name(_name),
174 cookie(_cookie),
175 duration_secs(_duration_secs)
176 {
177 }
178
179 int RGWAsyncUnlockSystemObj::_send_request()
180 {
181 rgw_rados_ref ref;
182 int r = store->get_raw_obj_ref(obj, &ref);
183 if (r < 0) {
184 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
185 return r;
186 }
187
188 rados::cls::lock::Lock l(lock_name);
189
190 l.set_cookie(cookie);
191
192 return l.unlock(&ref.ioctx, ref.oid);
193 }
194
195 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
196 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
197 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
198 obj(_obj),
199 lock_name(_name), cookie(_cookie)
200 {
201 }
202
203
204 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
205 const rgw_raw_obj& _obj,
206 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
207 store(_store),
208 entries(_entries),
209 obj(_obj), cn(NULL)
210 {
211 stringstream& s = set_description();
212 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
213 for (auto i = entries.begin(); i != entries.end(); ++i) {
214 if (i != entries.begin()) {
215 s << ", ";
216 }
217 s << i->first;
218 }
219 s << "]";
220 }
221
222 RGWRadosSetOmapKeysCR::~RGWRadosSetOmapKeysCR()
223 {
224 if (cn) {
225 cn->put();
226 }
227 }
228
229 int RGWRadosSetOmapKeysCR::send_request()
230 {
231 int r = store->get_raw_obj_ref(obj, &ref);
232 if (r < 0) {
233 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
234 return r;
235 }
236
237 set_status() << "sending request";
238
239 librados::ObjectWriteOperation op;
240 op.omap_set(entries);
241
242 cn = stack->create_completion_notifier();
243 cn->get();
244 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
245 }
246
247 int RGWRadosSetOmapKeysCR::request_complete()
248 {
249 int r = cn->completion()->get_return_value();
250
251 set_status() << "request complete; ret=" << r;
252
253 return r;
254 }
255
256 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
257 const rgw_raw_obj& _obj,
258 const string& _marker,
259 map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
260 store(_store),
261 marker(_marker),
262 entries(_entries), max_entries(_max_entries), rval(0),
263 obj(_obj), cn(NULL)
264 {
265 set_description() << "set omap keys dest=" << obj << " marker=" << marker;
266 }
267
268 RGWRadosGetOmapKeysCR::~RGWRadosGetOmapKeysCR()
269 {
270 }
271
272 int RGWRadosGetOmapKeysCR::send_request() {
273 int r = store->get_raw_obj_ref(obj, &ref);
274 if (r < 0) {
275 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
276 return r;
277 }
278
279 set_status() << "send request";
280
281 librados::ObjectReadOperation op;
282 op.omap_get_vals2(marker, max_entries, entries, nullptr, &rval);
283
284 cn = stack->create_completion_notifier();
285 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
286 }
287
288 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
289 const rgw_raw_obj& _obj,
290 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
291 store(_store),
292 keys(_keys),
293 obj(_obj), cn(NULL)
294 {
295 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
296 }
297
298 RGWRadosRemoveOmapKeysCR::~RGWRadosRemoveOmapKeysCR()
299 {
300 }
301
302 int RGWRadosRemoveOmapKeysCR::send_request() {
303 int r = store->get_raw_obj_ref(obj, &ref);
304 if (r < 0) {
305 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
306 return r;
307 }
308
309 set_status() << "send request";
310
311 librados::ObjectWriteOperation op;
312 op.omap_rm_keys(keys);
313
314 cn = stack->create_completion_notifier();
315 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
316 }
317
318 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
319 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
320 {
321 set_description() << "remove dest=" << obj;
322 }
323
324 int RGWRadosRemoveCR::send_request()
325 {
326 auto rados = store->get_rados_handle();
327 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
328 if (r < 0) {
329 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
330 return r;
331 }
332 ioctx.locator_set_key(obj.loc);
333
334 set_status() << "send request";
335
336 librados::ObjectWriteOperation op;
337 op.remove();
338
339 cn = stack->create_completion_notifier();
340 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
341 }
342
343 int RGWRadosRemoveCR::request_complete()
344 {
345 int r = cn->completion()->get_return_value();
346
347 set_status() << "request complete; ret=" << r;
348
349 return r;
350 }
351
352 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
353 const rgw_raw_obj& _obj,
354 const string& _lock_name,
355 const string& _cookie,
356 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
357 async_rados(_async_rados),
358 store(_store),
359 lock_name(_lock_name),
360 cookie(_cookie),
361 duration(_duration),
362 obj(_obj),
363 req(NULL)
364 {
365 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
366 }
367
368 void RGWSimpleRadosLockCR::request_cleanup()
369 {
370 if (req) {
371 req->finish();
372 req = NULL;
373 }
374 }
375
376 int RGWSimpleRadosLockCR::send_request()
377 {
378 set_status() << "sending request";
379 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
380 store, NULL, obj, lock_name, cookie, duration);
381 async_rados->queue(req);
382 return 0;
383 }
384
385 int RGWSimpleRadosLockCR::request_complete()
386 {
387 set_status() << "request complete; ret=" << req->get_ret_status();
388 return req->get_ret_status();
389 }
390
391 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
392 const rgw_raw_obj& _obj,
393 const string& _lock_name,
394 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
395 async_rados(_async_rados),
396 store(_store),
397 lock_name(_lock_name),
398 cookie(_cookie),
399 obj(_obj),
400 req(NULL)
401 {
402 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
403 }
404
405 void RGWSimpleRadosUnlockCR::request_cleanup()
406 {
407 if (req) {
408 req->finish();
409 req = NULL;
410 }
411 }
412
413 int RGWSimpleRadosUnlockCR::send_request()
414 {
415 set_status() << "sending request";
416
417 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
418 store, NULL, obj, lock_name, cookie);
419 async_rados->queue(req);
420 return 0;
421 }
422
423 int RGWSimpleRadosUnlockCR::request_complete()
424 {
425 set_status() << "request complete; ret=" << req->get_ret_status();
426 return req->get_ret_status();
427 }
428
429
430 int RGWOmapAppend::operate() {
431 reenter(this) {
432 for (;;) {
433 if (!has_product() && going_down) {
434 set_status() << "going down";
435 break;
436 }
437 set_status() << "waiting for product";
438 yield wait_for_product();
439 yield {
440 string entry;
441 while (consume(&entry)) {
442 set_status() << "adding entry: " << entry;
443 entries[entry] = bufferlist();
444 if (entries.size() >= window_size) {
445 break;
446 }
447 }
448 if (entries.size() >= window_size || going_down) {
449 set_status() << "flushing to omap";
450 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
451 entries.clear();
452 }
453 }
454 if (get_ret_status() < 0) {
455 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
456 return set_state(RGWCoroutine_Error);
457 }
458 }
459 /* done with coroutine */
460 return set_state(RGWCoroutine_Done);
461 }
462 return 0;
463 }
464
465 void RGWOmapAppend::flush_pending() {
466 receive(pending_entries);
467 num_pending_entries = 0;
468 }
469
470 bool RGWOmapAppend::append(const string& s) {
471 if (is_done()) {
472 return false;
473 }
474 ++total_entries;
475 pending_entries.push_back(s);
476 if (++num_pending_entries >= (int)window_size) {
477 flush_pending();
478 }
479 return true;
480 }
481
482 bool RGWOmapAppend::finish() {
483 going_down = true;
484 flush_pending();
485 set_sleeping(false);
486 return (!is_done());
487 }
488
489 int RGWAsyncGetBucketInstanceInfo::_send_request()
490 {
491 RGWObjectCtx obj_ctx(store);
492 int r = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info, NULL, NULL);
493 if (r < 0) {
494 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
495 << bucket << dendl;
496 return r;
497 }
498
499 return 0;
500 }
501
502 int RGWAsyncFetchRemoteObj::_send_request()
503 {
504 RGWObjectCtx obj_ctx(store);
505
506 string user_id;
507 char buf[16];
508 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
509 string client_id = store->zone_id() + buf;
510 string op_id = store->unique_id(store->get_new_req_id());
511 map<string, bufferlist> attrs;
512
513 rgw_obj src_obj(bucket_info.bucket, key);
514
515 rgw_obj dest_obj(src_obj);
516
517 int r = store->fetch_remote_obj(obj_ctx,
518 user_id,
519 client_id,
520 op_id,
521 false, /* don't record op state in ops log */
522 NULL, /* req_info */
523 source_zone,
524 dest_obj,
525 src_obj,
526 bucket_info, /* dest */
527 bucket_info, /* source */
528 NULL, /* real_time* src_mtime, */
529 NULL, /* real_time* mtime, */
530 NULL, /* const real_time* mod_ptr, */
531 NULL, /* const real_time* unmod_ptr, */
532 false, /* high precision time */
533 NULL, /* const char *if_match, */
534 NULL, /* const char *if_nomatch, */
535 RGWRados::ATTRSMOD_NONE,
536 copy_if_newer,
537 attrs,
538 RGW_OBJ_CATEGORY_MAIN,
539 versioned_epoch,
540 real_time(), /* delete_at */
541 &key.instance, /* string *version_id, */
542 NULL, /* string *ptag, */
543 NULL, /* string *petag, */
544 NULL, /* struct rgw_err *err, */
545 NULL, /* void (*progress_cb)(off_t, void *), */
546 NULL); /* void *progress_data*); */
547
548 if (r < 0) {
549 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
550 }
551 return r;
552 }
553
554 int RGWAsyncStatRemoteObj::_send_request()
555 {
556 RGWObjectCtx obj_ctx(store);
557
558 string user_id;
559 char buf[16];
560 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
561 string client_id = store->zone_id() + buf;
562 string op_id = store->unique_id(store->get_new_req_id());
563
564 rgw_obj src_obj(bucket_info.bucket, key);
565
566 rgw_obj dest_obj(src_obj);
567
568 int r = store->stat_remote_obj(obj_ctx,
569 user_id,
570 client_id,
571 nullptr, /* req_info */
572 source_zone,
573 src_obj,
574 bucket_info, /* source */
575 pmtime, /* real_time* src_mtime, */
576 psize, /* uint64_t * */
577 nullptr, /* const real_time* mod_ptr, */
578 nullptr, /* const real_time* unmod_ptr, */
579 true, /* high precision time */
580 nullptr, /* const char *if_match, */
581 nullptr, /* const char *if_nomatch, */
582 pattrs,
583 nullptr,
584 nullptr, /* string *ptag, */
585 nullptr); /* string *petag, */
586
587 if (r < 0) {
588 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
589 }
590 return r;
591 }
592
593
594 int RGWAsyncRemoveObj::_send_request()
595 {
596 RGWObjectCtx obj_ctx(store);
597
598 rgw_obj obj(bucket_info.bucket, key);
599
600 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
601
602 obj_ctx.obj.set_atomic(obj);
603
604 RGWObjState *state;
605
606 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
607 if (ret < 0) {
608 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
609 return ret;
610 }
611
612 /* has there been any racing object write? */
613 if (del_if_older && (state->mtime > timestamp)) {
614 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
615 return 0;
616 }
617
618 RGWAccessControlPolicy policy;
619
620 /* decode policy */
621 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
622 if (iter != state->attrset.end()) {
623 bufferlist::iterator bliter = iter->second.begin();
624 try {
625 policy.decode(bliter);
626 } catch (buffer::error& err) {
627 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
628 return -EIO;
629 }
630 }
631
632 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
633 RGWRados::Object::Delete del_op(&del_target);
634
635 del_op.params.bucket_owner = bucket_info.owner;
636 del_op.params.obj_owner = policy.get_owner();
637 if (del_if_older) {
638 del_op.params.unmod_since = timestamp;
639 }
640 if (versioned) {
641 del_op.params.versioning_status = BUCKET_VERSIONED;
642 }
643 del_op.params.olh_epoch = versioned_epoch;
644 del_op.params.marker_version_id = marker_version_id;
645 del_op.params.obj_owner.set_id(owner);
646 del_op.params.obj_owner.set_name(owner_display_name);
647 del_op.params.mtime = timestamp;
648 del_op.params.high_precision_time = true;
649
650 ret = del_op.delete_obj();
651 if (ret < 0) {
652 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
653 }
654 return ret;
655 }
656
657 int RGWContinuousLeaseCR::operate()
658 {
659 if (aborted) {
660 caller->set_sleeping(false);
661 return set_cr_done();
662 }
663 reenter(this) {
664 while (!going_down) {
665 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
666
667 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
668 if (retcode < 0) {
669 set_locked(false);
670 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
671 return set_state(RGWCoroutine_Error, retcode);
672 }
673 set_locked(true);
674 yield wait(utime_t(interval / 2, 0));
675 }
676 set_locked(false); /* moot at this point anyway */
677 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
678 return set_state(RGWCoroutine_Done);
679 }
680 return 0;
681 }
682
683 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
684 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
685 store(_store),
686 oid(_oid), cn(NULL)
687 {
688 stringstream& s = set_description();
689 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
690 entries.push_back(entry);
691 }
692
693 RGWRadosTimelogAddCR::~RGWRadosTimelogAddCR()
694 {
695 if (cn) {
696 cn->put();
697 }
698 }
699
700 int RGWRadosTimelogAddCR::send_request()
701 {
702 set_status() << "sending request";
703
704 cn = stack->create_completion_notifier();
705 cn->get();
706 return store->time_log_add(oid, entries, cn->completion(), true);
707 }
708
709 int RGWRadosTimelogAddCR::request_complete()
710 {
711 int r = cn->completion()->get_return_value();
712
713 set_status() << "request complete; ret=" << r;
714
715 return r;
716 }
717
718 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
719 const std::string& oid,
720 const real_time& start_time,
721 const real_time& end_time,
722 const std::string& from_marker,
723 const std::string& to_marker)
724 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
725 start_time(start_time), end_time(end_time),
726 from_marker(from_marker), to_marker(to_marker)
727 {
728 set_description() << "timelog trim oid=" << oid
729 << " start_time=" << start_time << " end_time=" << end_time
730 << " from_marker=" << from_marker << " to_marker=" << to_marker;
731 }
732
733 RGWRadosTimelogTrimCR::~RGWRadosTimelogTrimCR()
734 {
735 if (cn) {
736 cn->put();
737 }
738 }
739
740 int RGWRadosTimelogTrimCR::send_request()
741 {
742 set_status() << "sending request";
743
744 cn = stack->create_completion_notifier();
745 cn->get();
746 return store->time_log_trim(oid, start_time, end_time, from_marker,
747 to_marker, cn->completion());
748 }
749
750 int RGWRadosTimelogTrimCR::request_complete()
751 {
752 int r = cn->completion()->get_return_value();
753
754 set_status() << "request complete; ret=" << r;
755
756 return r;
757 }
758
759
760 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
761 const std::string& to_marker,
762 std::string *last_trim_marker)
763 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
764 std::string{}, to_marker),
765 cct(store->ctx()), last_trim_marker(last_trim_marker)
766 {
767 }
768
769 int RGWSyncLogTrimCR::request_complete()
770 {
771 int r = RGWRadosTimelogTrimCR::request_complete();
772 if (r < 0 && r != -ENODATA) {
773 return r;
774 }
775 if (*last_trim_marker < to_marker) {
776 *last_trim_marker = to_marker;
777 }
778 return 0;
779 }
780
781
782 int RGWAsyncStatObj::_send_request()
783 {
784 rgw_raw_obj raw_obj;
785 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
786 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
787 nullptr, nullptr, objv_tracker);
788 }
789
790 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
791 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
792 real_time* pmtime, uint64_t *pepoch,
793 RGWObjVersionTracker *objv_tracker)
794 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
795 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
796 objv_tracker(objv_tracker)
797 {
798 }
799
800 void RGWStatObjCR::request_cleanup()
801 {
802 if (req) {
803 req->finish();
804 req = NULL;
805 }
806 }
807
808 int RGWStatObjCR::send_request()
809 {
810 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
811 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
812 async_rados->queue(req);
813 return 0;
814 }
815
816 int RGWStatObjCR::request_complete()
817 {
818 return req->get_ret_status();
819 }