]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rados.cc
update sources to v12.1.1
[ceph.git] / ceph / src / rgw / rgw_cr_rados.cc
CommitLineData
7c673cae
FG
1#include "rgw_rados.h"
2#include "rgw_coroutine.h"
7c673cae
FG
3#include "rgw_cr_rados.h"
4
5#include "cls/lock/cls_lock_client.h"
6
31f18b77
FG
7#include <boost/asio/yield.hpp>
8
7c673cae
FG
9#define dout_context g_ceph_context
10#define dout_subsys ceph_subsys_rgw
11
12bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
13 if (processor->is_going_down()) {
14 return false;
15 }
16 req->get();
17 processor->m_req_queue.push_back(req);
18 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
19 _dump_queue();
20 return true;
21}
22
23bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
24 return processor->m_req_queue.empty();
25}
26
27RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
28 if (processor->m_req_queue.empty())
29 return NULL;
30 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
31 processor->m_req_queue.pop_front();
32 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
33 _dump_queue();
34 return req;
35}
36
37void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
38 processor->handle_request(req);
39 processor->req_throttle.put(1);
40}
41
42void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
43 if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
44 return;
45 }
46 deque<RGWAsyncRadosRequest *>::iterator iter;
47 if (processor->m_req_queue.empty()) {
48 dout(20) << "RGWWQ: empty" << dendl;
49 return;
50 }
51 dout(20) << "RGWWQ:" << dendl;
52 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
53 dout(20) << "req: " << hex << *iter << dec << dendl;
54 }
55}
56
57RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
58 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
59 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
60 req_wq(this, g_conf->rgw_op_thread_timeout,
61 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
62}
63
64void RGWAsyncRadosProcessor::start() {
65 m_tp.start();
66}
67
68void RGWAsyncRadosProcessor::stop() {
69 going_down = true;
70 m_tp.drain(&req_wq);
71 m_tp.stop();
72 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
73 (*iter)->put();
74 }
75}
76
77void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
78 req->send_request();
79 req->put();
80}
81
82void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
83 req_throttle.get(1);
84 req_wq.queue(req);
85}
86
87int RGWAsyncGetSystemObj::_send_request()
88{
89 return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, pattrs, NULL);
90}
91
92RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
93 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
94 bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(caller, cn), store(_store), obj_ctx(_obj_ctx),
95 objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), pattrs(NULL),
96 ofs(_ofs), end(_end)
97{
98}
99
100int RGWSimpleRadosReadAttrsCR::send_request()
101{
102 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
103 store, &obj_ctx, NULL,
104 obj,
105 &bl, 0, -1);
106 if (pattrs) {
107 req->set_read_attrs(pattrs);
108 }
109 async_rados->queue(req);
110 return 0;
111}
112
113int RGWSimpleRadosReadAttrsCR::request_complete()
114{
115 return req->get_ret_status();
116}
117
118int RGWAsyncPutSystemObj::_send_request()
119{
120 return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
121}
122
123RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
124 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
125 bool _exclusive, bufferlist& _bl)
126 : RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
127 obj(_obj), exclusive(_exclusive), bl(_bl)
128{
129}
130
131int RGWAsyncPutSystemObjAttrs::_send_request()
132{
133 return store->system_obj_set_attrs(NULL, obj, *attrs, NULL, objv_tracker);
134}
135
136RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
137 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
138 map<string, bufferlist> *_attrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
139 objv_tracker(_objv_tracker), obj(_obj),
140 attrs(_attrs)
141{
142}
143
144
145RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
146 uint64_t _window_size)
147 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
148 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
149{
150}
151
152int RGWAsyncLockSystemObj::_send_request()
153{
154 rgw_rados_ref ref;
155 int r = store->get_raw_obj_ref(obj, &ref);
156 if (r < 0) {
157 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
158 return r;
159 }
160
161 rados::cls::lock::Lock l(lock_name);
162 utime_t duration(duration_secs, 0);
163 l.set_duration(duration);
164 l.set_cookie(cookie);
165 l.set_renew(true);
166
167 return l.lock_exclusive(&ref.ioctx, ref.oid);
168}
169
170RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
171 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
172 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
173 obj(_obj),
174 lock_name(_name),
175 cookie(_cookie),
176 duration_secs(_duration_secs)
177{
178}
179
180int RGWAsyncUnlockSystemObj::_send_request()
181{
182 rgw_rados_ref ref;
183 int r = store->get_raw_obj_ref(obj, &ref);
184 if (r < 0) {
185 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
186 return r;
187 }
188
189 rados::cls::lock::Lock l(lock_name);
190
191 l.set_cookie(cookie);
192
193 return l.unlock(&ref.ioctx, ref.oid);
194}
195
196RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
197 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
198 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
199 obj(_obj),
200 lock_name(_name), cookie(_cookie)
201{
202}
203
204
205RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
206 const rgw_raw_obj& _obj,
207 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
208 store(_store),
209 entries(_entries),
210 obj(_obj), cn(NULL)
211{
212 stringstream& s = set_description();
213 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
214 for (auto i = entries.begin(); i != entries.end(); ++i) {
215 if (i != entries.begin()) {
216 s << ", ";
217 }
218 s << i->first;
219 }
220 s << "]";
221}
222
7c673cae
FG
223int RGWRadosSetOmapKeysCR::send_request()
224{
225 int r = store->get_raw_obj_ref(obj, &ref);
226 if (r < 0) {
227 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
228 return r;
229 }
230
231 set_status() << "sending request";
232
233 librados::ObjectWriteOperation op;
234 op.omap_set(entries);
235
236 cn = stack->create_completion_notifier();
7c673cae
FG
237 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
238}
239
240int RGWRadosSetOmapKeysCR::request_complete()
241{
242 int r = cn->completion()->get_return_value();
243
244 set_status() << "request complete; ret=" << r;
245
246 return r;
247}
248
249RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
250 const rgw_raw_obj& _obj,
251 const string& _marker,
252 map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
253 store(_store),
254 marker(_marker),
255 entries(_entries), max_entries(_max_entries), rval(0),
256 obj(_obj), cn(NULL)
257{
258 set_description() << "set omap keys dest=" << obj << " marker=" << marker;
259}
260
7c673cae
FG
261int RGWRadosGetOmapKeysCR::send_request() {
262 int r = store->get_raw_obj_ref(obj, &ref);
263 if (r < 0) {
264 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
265 return r;
266 }
267
268 set_status() << "send request";
269
270 librados::ObjectReadOperation op;
271 op.omap_get_vals2(marker, max_entries, entries, nullptr, &rval);
272
273 cn = stack->create_completion_notifier();
274 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
275}
276
277RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
278 const rgw_raw_obj& _obj,
279 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
280 store(_store),
281 keys(_keys),
282 obj(_obj), cn(NULL)
283{
284 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
285}
286
7c673cae
FG
287int RGWRadosRemoveOmapKeysCR::send_request() {
288 int r = store->get_raw_obj_ref(obj, &ref);
289 if (r < 0) {
290 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
291 return r;
292 }
293
294 set_status() << "send request";
295
296 librados::ObjectWriteOperation op;
297 op.omap_rm_keys(keys);
298
299 cn = stack->create_completion_notifier();
300 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
301}
302
224ce89b
WB
303int RGWRadosRemoveOmapKeysCR::request_complete()
304{
305 int r = cn->completion()->get_return_value();
306
307 set_status() << "request complete; ret=" << r;
308
309 return r;
310}
311
7c673cae
FG
312RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
313 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
314{
315 set_description() << "remove dest=" << obj;
316}
317
318int RGWRadosRemoveCR::send_request()
319{
320 auto rados = store->get_rados_handle();
321 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
322 if (r < 0) {
323 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
324 return r;
325 }
326 ioctx.locator_set_key(obj.loc);
327
328 set_status() << "send request";
329
330 librados::ObjectWriteOperation op;
331 op.remove();
332
333 cn = stack->create_completion_notifier();
334 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
335}
336
337int RGWRadosRemoveCR::request_complete()
338{
339 int r = cn->completion()->get_return_value();
340
341 set_status() << "request complete; ret=" << r;
342
343 return r;
344}
345
346RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
347 const rgw_raw_obj& _obj,
348 const string& _lock_name,
349 const string& _cookie,
350 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
351 async_rados(_async_rados),
352 store(_store),
353 lock_name(_lock_name),
354 cookie(_cookie),
355 duration(_duration),
356 obj(_obj),
357 req(NULL)
358{
359 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
360}
361
362void RGWSimpleRadosLockCR::request_cleanup()
363{
364 if (req) {
365 req->finish();
366 req = NULL;
367 }
368}
369
370int RGWSimpleRadosLockCR::send_request()
371{
372 set_status() << "sending request";
373 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
374 store, NULL, obj, lock_name, cookie, duration);
375 async_rados->queue(req);
376 return 0;
377}
378
379int RGWSimpleRadosLockCR::request_complete()
380{
381 set_status() << "request complete; ret=" << req->get_ret_status();
382 return req->get_ret_status();
383}
384
385RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
386 const rgw_raw_obj& _obj,
387 const string& _lock_name,
388 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
389 async_rados(_async_rados),
390 store(_store),
391 lock_name(_lock_name),
392 cookie(_cookie),
393 obj(_obj),
394 req(NULL)
395{
396 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
397}
398
399void RGWSimpleRadosUnlockCR::request_cleanup()
400{
401 if (req) {
402 req->finish();
403 req = NULL;
404 }
405}
406
407int RGWSimpleRadosUnlockCR::send_request()
408{
409 set_status() << "sending request";
410
411 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
412 store, NULL, obj, lock_name, cookie);
413 async_rados->queue(req);
414 return 0;
415}
416
417int RGWSimpleRadosUnlockCR::request_complete()
418{
419 set_status() << "request complete; ret=" << req->get_ret_status();
420 return req->get_ret_status();
421}
422
423
424int RGWOmapAppend::operate() {
425 reenter(this) {
426 for (;;) {
427 if (!has_product() && going_down) {
428 set_status() << "going down";
429 break;
430 }
431 set_status() << "waiting for product";
432 yield wait_for_product();
433 yield {
434 string entry;
435 while (consume(&entry)) {
436 set_status() << "adding entry: " << entry;
437 entries[entry] = bufferlist();
438 if (entries.size() >= window_size) {
439 break;
440 }
441 }
442 if (entries.size() >= window_size || going_down) {
443 set_status() << "flushing to omap";
444 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
445 entries.clear();
446 }
447 }
448 if (get_ret_status() < 0) {
449 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
450 return set_state(RGWCoroutine_Error);
451 }
452 }
453 /* done with coroutine */
454 return set_state(RGWCoroutine_Done);
455 }
456 return 0;
457}
458
459void RGWOmapAppend::flush_pending() {
460 receive(pending_entries);
461 num_pending_entries = 0;
462}
463
464bool RGWOmapAppend::append(const string& s) {
465 if (is_done()) {
466 return false;
467 }
468 ++total_entries;
469 pending_entries.push_back(s);
470 if (++num_pending_entries >= (int)window_size) {
471 flush_pending();
472 }
473 return true;
474}
475
476bool RGWOmapAppend::finish() {
477 going_down = true;
478 flush_pending();
479 set_sleeping(false);
480 return (!is_done());
481}
482
483int RGWAsyncGetBucketInstanceInfo::_send_request()
484{
485 RGWObjectCtx obj_ctx(store);
486 int r = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info, NULL, NULL);
487 if (r < 0) {
488 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
489 << bucket << dendl;
490 return r;
491 }
492
493 return 0;
494}
495
496int RGWAsyncFetchRemoteObj::_send_request()
497{
498 RGWObjectCtx obj_ctx(store);
499
500 string user_id;
501 char buf[16];
502 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
503 string client_id = store->zone_id() + buf;
504 string op_id = store->unique_id(store->get_new_req_id());
505 map<string, bufferlist> attrs;
506
507 rgw_obj src_obj(bucket_info.bucket, key);
508
509 rgw_obj dest_obj(src_obj);
510
511 int r = store->fetch_remote_obj(obj_ctx,
512 user_id,
513 client_id,
514 op_id,
515 false, /* don't record op state in ops log */
516 NULL, /* req_info */
517 source_zone,
518 dest_obj,
519 src_obj,
520 bucket_info, /* dest */
521 bucket_info, /* source */
522 NULL, /* real_time* src_mtime, */
523 NULL, /* real_time* mtime, */
524 NULL, /* const real_time* mod_ptr, */
525 NULL, /* const real_time* unmod_ptr, */
526 false, /* high precision time */
527 NULL, /* const char *if_match, */
528 NULL, /* const char *if_nomatch, */
529 RGWRados::ATTRSMOD_NONE,
530 copy_if_newer,
531 attrs,
532 RGW_OBJ_CATEGORY_MAIN,
533 versioned_epoch,
534 real_time(), /* delete_at */
535 &key.instance, /* string *version_id, */
536 NULL, /* string *ptag, */
537 NULL, /* string *petag, */
7c673cae 538 NULL, /* void (*progress_cb)(off_t, void *), */
31f18b77
FG
539 NULL, /* void *progress_data*); */
540 zones_trace);
7c673cae
FG
541
542 if (r < 0) {
543 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
544 }
545 return r;
546}
547
548int RGWAsyncStatRemoteObj::_send_request()
549{
550 RGWObjectCtx obj_ctx(store);
551
552 string user_id;
553 char buf[16];
554 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
555 string client_id = store->zone_id() + buf;
556 string op_id = store->unique_id(store->get_new_req_id());
557
558 rgw_obj src_obj(bucket_info.bucket, key);
559
560 rgw_obj dest_obj(src_obj);
561
562 int r = store->stat_remote_obj(obj_ctx,
563 user_id,
564 client_id,
565 nullptr, /* req_info */
566 source_zone,
567 src_obj,
568 bucket_info, /* source */
569 pmtime, /* real_time* src_mtime, */
570 psize, /* uint64_t * */
571 nullptr, /* const real_time* mod_ptr, */
572 nullptr, /* const real_time* unmod_ptr, */
573 true, /* high precision time */
574 nullptr, /* const char *if_match, */
575 nullptr, /* const char *if_nomatch, */
576 pattrs,
577 nullptr,
578 nullptr, /* string *ptag, */
579 nullptr); /* string *petag, */
580
581 if (r < 0) {
582 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
583 }
584 return r;
585}
586
587
588int RGWAsyncRemoveObj::_send_request()
589{
590 RGWObjectCtx obj_ctx(store);
591
592 rgw_obj obj(bucket_info.bucket, key);
593
594 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
595
596 obj_ctx.obj.set_atomic(obj);
597
598 RGWObjState *state;
599
600 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
601 if (ret < 0) {
602 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
603 return ret;
604 }
605
606 /* has there been any racing object write? */
607 if (del_if_older && (state->mtime > timestamp)) {
608 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
609 return 0;
610 }
611
612 RGWAccessControlPolicy policy;
613
614 /* decode policy */
615 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
616 if (iter != state->attrset.end()) {
617 bufferlist::iterator bliter = iter->second.begin();
618 try {
619 policy.decode(bliter);
620 } catch (buffer::error& err) {
621 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
622 return -EIO;
623 }
624 }
625
626 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
627 RGWRados::Object::Delete del_op(&del_target);
628
629 del_op.params.bucket_owner = bucket_info.owner;
630 del_op.params.obj_owner = policy.get_owner();
631 if (del_if_older) {
632 del_op.params.unmod_since = timestamp;
633 }
634 if (versioned) {
635 del_op.params.versioning_status = BUCKET_VERSIONED;
636 }
637 del_op.params.olh_epoch = versioned_epoch;
638 del_op.params.marker_version_id = marker_version_id;
639 del_op.params.obj_owner.set_id(owner);
640 del_op.params.obj_owner.set_name(owner_display_name);
641 del_op.params.mtime = timestamp;
642 del_op.params.high_precision_time = true;
31f18b77 643 del_op.params.zones_trace = zones_trace;
7c673cae
FG
644
645 ret = del_op.delete_obj();
646 if (ret < 0) {
647 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
648 }
649 return ret;
650}
651
652int RGWContinuousLeaseCR::operate()
653{
654 if (aborted) {
655 caller->set_sleeping(false);
656 return set_cr_done();
657 }
658 reenter(this) {
659 while (!going_down) {
660 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
661
662 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
663 if (retcode < 0) {
664 set_locked(false);
665 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
666 return set_state(RGWCoroutine_Error, retcode);
667 }
668 set_locked(true);
669 yield wait(utime_t(interval / 2, 0));
670 }
671 set_locked(false); /* moot at this point anyway */
672 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
673 return set_state(RGWCoroutine_Done);
674 }
675 return 0;
676}
677
678RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
679 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
680 store(_store),
681 oid(_oid), cn(NULL)
682{
683 stringstream& s = set_description();
684 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
685 entries.push_back(entry);
686}
687
7c673cae
FG
688int RGWRadosTimelogAddCR::send_request()
689{
690 set_status() << "sending request";
691
692 cn = stack->create_completion_notifier();
7c673cae
FG
693 return store->time_log_add(oid, entries, cn->completion(), true);
694}
695
696int RGWRadosTimelogAddCR::request_complete()
697{
698 int r = cn->completion()->get_return_value();
699
700 set_status() << "request complete; ret=" << r;
701
702 return r;
703}
704
705RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
706 const std::string& oid,
707 const real_time& start_time,
708 const real_time& end_time,
709 const std::string& from_marker,
710 const std::string& to_marker)
711 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
712 start_time(start_time), end_time(end_time),
713 from_marker(from_marker), to_marker(to_marker)
714{
715 set_description() << "timelog trim oid=" << oid
716 << " start_time=" << start_time << " end_time=" << end_time
717 << " from_marker=" << from_marker << " to_marker=" << to_marker;
718}
719
7c673cae
FG
720int RGWRadosTimelogTrimCR::send_request()
721{
722 set_status() << "sending request";
723
724 cn = stack->create_completion_notifier();
7c673cae
FG
725 return store->time_log_trim(oid, start_time, end_time, from_marker,
726 to_marker, cn->completion());
727}
728
729int RGWRadosTimelogTrimCR::request_complete()
730{
731 int r = cn->completion()->get_return_value();
732
733 set_status() << "request complete; ret=" << r;
734
735 return r;
736}
737
738
739RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
740 const std::string& to_marker,
741 std::string *last_trim_marker)
742 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
743 std::string{}, to_marker),
744 cct(store->ctx()), last_trim_marker(last_trim_marker)
745{
746}
747
748int RGWSyncLogTrimCR::request_complete()
749{
750 int r = RGWRadosTimelogTrimCR::request_complete();
751 if (r < 0 && r != -ENODATA) {
752 return r;
753 }
754 if (*last_trim_marker < to_marker) {
755 *last_trim_marker = to_marker;
756 }
757 return 0;
758}
759
760
761int RGWAsyncStatObj::_send_request()
762{
763 rgw_raw_obj raw_obj;
764 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
765 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
766 nullptr, nullptr, objv_tracker);
767}
768
769RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
770 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
771 real_time* pmtime, uint64_t *pepoch,
772 RGWObjVersionTracker *objv_tracker)
773 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
774 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
775 objv_tracker(objv_tracker)
776{
777}
778
779void RGWStatObjCR::request_cleanup()
780{
781 if (req) {
782 req->finish();
783 req = NULL;
784 }
785}
786
787int RGWStatObjCR::send_request()
788{
789 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
790 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
791 async_rados->queue(req);
792 return 0;
793}
794
795int RGWStatObjCR::request_complete()
796{
797 return req->get_ret_status();
798}