]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_cr_rados.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_cr_rados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/compat.h"
5 #include "rgw_sal.h"
6 #include "rgw_zone.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_sync_counters.h"
10 #include "rgw_bucket.h"
11 #include "rgw_datalog_notify.h"
12 #include "rgw_cr_rest.h"
13 #include "rgw_rest_conn.h"
14 #include "rgw_rados.h"
15
16 #include "services/svc_zone.h"
17 #include "services/svc_zone_utils.h"
18 #include "services/svc_sys_obj.h"
19 #include "services/svc_cls.h"
20
21 #include "cls/lock/cls_lock_client.h"
22 #include "cls/rgw/cls_rgw_client.h"
23
24 #include <boost/asio/yield.hpp>
25 #include <boost/container/flat_set.hpp>
26
27 #define dout_context g_ceph_context
28 #define dout_subsys ceph_subsys_rgw
29
30 using namespace std;
31
32 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
33 if (processor->is_going_down()) {
34 return false;
35 }
36 req->get();
37 processor->m_req_queue.push_back(req);
38 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
39 _dump_queue();
40 return true;
41 }
42
43 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
44 return processor->m_req_queue.empty();
45 }
46
47 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
48 if (processor->m_req_queue.empty())
49 return NULL;
50 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
51 processor->m_req_queue.pop_front();
52 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
53 _dump_queue();
54 return req;
55 }
56
57 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
58 processor->handle_request(this, req);
59 processor->req_throttle.put(1);
60 }
61
62 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
63 if (!g_conf()->subsys.should_gather<ceph_subsys_rgw, 20>()) {
64 return;
65 }
66 deque<RGWAsyncRadosRequest *>::iterator iter;
67 if (processor->m_req_queue.empty()) {
68 dout(20) << "RGWWQ: empty" << dendl;
69 return;
70 }
71 dout(20) << "RGWWQ:" << dendl;
72 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
73 dout(20) << "req: " << hex << *iter << dec << dendl;
74 }
75 }
76
77 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads)
78 : cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
79 req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2),
80 req_wq(this,
81 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
82 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
83 &m_tp) {
84 }
85
86 void RGWAsyncRadosProcessor::start() {
87 m_tp.start();
88 }
89
90 void RGWAsyncRadosProcessor::stop() {
91 going_down = true;
92 m_tp.drain(&req_wq);
93 m_tp.stop();
94 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
95 (*iter)->put();
96 }
97 }
98
99 void RGWAsyncRadosProcessor::handle_request(const DoutPrefixProvider *dpp, RGWAsyncRadosRequest *req) {
100 req->send_request(dpp);
101 req->put();
102 }
103
104 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
105 req_throttle.get(1);
106 req_wq.queue(req);
107 }
108
109 int RGWAsyncGetSystemObj::_send_request(const DoutPrefixProvider *dpp)
110 {
111 map<string, bufferlist> *pattrs = want_attrs ? &attrs : nullptr;
112
113 auto sysobj = svc_sysobj->get_obj(obj);
114 return sysobj.rop()
115 .set_objv_tracker(&objv_tracker)
116 .set_attrs(pattrs)
117 .set_raw_attrs(raw_attrs)
118 .read(dpp, &bl, null_yield);
119 }
120
121 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
122 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
123 bool want_attrs, bool raw_attrs)
124 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc_sysobj(_svc),
125 obj(_obj), want_attrs(want_attrs), raw_attrs(raw_attrs)
126 {
127 if (_objv_tracker) {
128 objv_tracker = *_objv_tracker;
129 }
130 }
131
132 int RGWSimpleRadosReadAttrsCR::send_request(const DoutPrefixProvider *dpp)
133 {
134 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
135 if (r < 0) {
136 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
137 << r << dendl;
138 return r;
139 }
140
141 set_status() << "sending request";
142
143 librados::ObjectReadOperation op;
144 if (objv_tracker) {
145 objv_tracker->prepare_op_for_read(&op);
146 }
147
148 if (raw_attrs && pattrs) {
149 op.getxattrs(pattrs, nullptr);
150 } else {
151 op.getxattrs(&unfiltered_attrs, nullptr);
152 }
153
154 cn = stack->create_completion_notifier();
155 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op,
156 nullptr);
157 }
158
159 int RGWSimpleRadosReadAttrsCR::request_complete()
160 {
161 int ret = cn->completion()->get_return_value();
162 set_status() << "request complete; ret=" << ret;
163 if (!raw_attrs && pattrs) {
164 rgw_filter_attrset(unfiltered_attrs, RGW_ATTR_PREFIX, pattrs);
165 }
166 return ret;
167 }
168
169 int RGWAsyncPutSystemObj::_send_request(const DoutPrefixProvider *dpp)
170 {
171 auto sysobj = svc->get_obj(obj);
172 return sysobj.wop()
173 .set_objv_tracker(&objv_tracker)
174 .set_exclusive(exclusive)
175 .write_data(dpp, bl, null_yield);
176 }
177
178 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(const DoutPrefixProvider *_dpp,
179 RGWCoroutine *caller,
180 RGWAioCompletionNotifier *cn,
181 RGWSI_SysObj *_svc,
182 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
183 bool _exclusive, bufferlist _bl)
184 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc(_svc),
185 obj(_obj), exclusive(_exclusive), bl(std::move(_bl))
186 {
187 if (_objv_tracker) {
188 objv_tracker = *_objv_tracker;
189 }
190 }
191
192 int RGWAsyncPutSystemObjAttrs::_send_request(const DoutPrefixProvider *dpp)
193 {
194 auto sysobj = svc->get_obj(obj);
195 return sysobj.wop()
196 .set_objv_tracker(&objv_tracker)
197 .set_exclusive(exclusive)
198 .set_attrs(attrs)
199 .write_attrs(dpp, null_yield);
200 }
201
202 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
203 RGWSI_SysObj *_svc,
204 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
205 map<string, bufferlist> _attrs, bool exclusive)
206 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), svc(_svc),
207 obj(_obj), attrs(std::move(_attrs)), exclusive(exclusive)
208 {
209 if (_objv_tracker) {
210 objv_tracker = *_objv_tracker;
211 }
212 }
213
214
215 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, const rgw_raw_obj& _obj,
216 uint64_t _window_size)
217 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
218 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
219 {
220 }
221
222 int RGWAsyncLockSystemObj::_send_request(const DoutPrefixProvider *dpp)
223 {
224 rgw_rados_ref ref;
225 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
226 if (r < 0) {
227 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
228 return r;
229 }
230
231 rados::cls::lock::Lock l(lock_name);
232 utime_t duration(duration_secs, 0);
233 l.set_duration(duration);
234 l.set_cookie(cookie);
235 l.set_may_renew(true);
236
237 return l.lock_exclusive(&ref.pool.ioctx(), ref.obj.oid);
238 }
239
240 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
241 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
242 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
243 obj(_obj),
244 lock_name(_name),
245 cookie(_cookie),
246 duration_secs(_duration_secs)
247 {
248 }
249
250 int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider *dpp)
251 {
252 rgw_rados_ref ref;
253 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
254 if (r < 0) {
255 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
256 return r;
257 }
258
259 rados::cls::lock::Lock l(lock_name);
260
261 l.set_cookie(cookie);
262
263 return l.unlock(&ref.pool.ioctx(), ref.obj.oid);
264 }
265
266 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
267 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
268 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
269 obj(_obj),
270 lock_name(_name), cookie(_cookie)
271 {
272 }
273
274 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(rgw::sal::RadosStore* _store,
275 const rgw_raw_obj& _obj,
276 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
277 store(_store),
278 entries(_entries),
279 obj(_obj), cn(NULL)
280 {
281 stringstream& s = set_description();
282 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
283 for (auto i = entries.begin(); i != entries.end(); ++i) {
284 if (i != entries.begin()) {
285 s << ", ";
286 }
287 s << i->first;
288 }
289 s << "]";
290 }
291
292 int RGWRadosSetOmapKeysCR::send_request(const DoutPrefixProvider *dpp)
293 {
294 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
295 if (r < 0) {
296 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
297 return r;
298 }
299
300 set_status() << "sending request";
301
302 librados::ObjectWriteOperation op;
303 op.omap_set(entries);
304
305 cn = stack->create_completion_notifier();
306 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
307 }
308
309 int RGWRadosSetOmapKeysCR::request_complete()
310 {
311 int r = cn->completion()->get_return_value();
312
313 set_status() << "request complete; ret=" << r;
314
315 return r;
316 }
317
318 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(rgw::sal::RadosStore* _store,
319 const rgw_raw_obj& _obj,
320 const string& _marker,
321 int _max_entries,
322 ResultPtr _result)
323 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
324 marker(_marker), max_entries(_max_entries),
325 result(std::move(_result))
326 {
327 ceph_assert(result); // must be allocated
328 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
329 }
330
331 int RGWRadosGetOmapKeysCR::send_request(const DoutPrefixProvider *dpp) {
332 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &result->ref);
333 if (r < 0) {
334 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
335 return r;
336 }
337
338 set_status() << "send request";
339
340 librados::ObjectReadOperation op;
341 op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
342
343 cn = stack->create_completion_notifier(result);
344 return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
345 }
346
347 int RGWRadosGetOmapKeysCR::request_complete()
348 {
349 int r = cn->completion()->get_return_value();
350
351 set_status() << "request complete; ret=" << r;
352
353 return r;
354 }
355
356 RGWRadosGetOmapValsCR::RGWRadosGetOmapValsCR(rgw::sal::RadosStore* _store,
357 const rgw_raw_obj& _obj,
358 const string& _marker,
359 int _max_entries,
360 ResultPtr _result)
361 : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
362 marker(_marker), max_entries(_max_entries),
363 result(std::move(_result))
364 {
365 ceph_assert(result); // must be allocated
366 set_description() << "get omap keys dest=" << obj << " marker=" << marker;
367 }
368
369 int RGWRadosGetOmapValsCR::send_request(const DoutPrefixProvider *dpp) {
370 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &result->ref);
371 if (r < 0) {
372 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
373 return r;
374 }
375
376 set_status() << "send request";
377
378 librados::ObjectReadOperation op;
379 op.omap_get_vals2(marker, max_entries, &result->entries, &result->more, nullptr);
380
381 cn = stack->create_completion_notifier(result);
382 return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
383 }
384
385 int RGWRadosGetOmapValsCR::request_complete()
386 {
387 int r = cn->completion()->get_return_value();
388
389 set_status() << "request complete; ret=" << r;
390
391 return r;
392 }
393
394 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore* _store,
395 const rgw_raw_obj& _obj,
396 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
397 store(_store),
398 keys(_keys),
399 obj(_obj), cn(NULL)
400 {
401 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
402 }
403
404 int RGWRadosRemoveOmapKeysCR::send_request(const DoutPrefixProvider *dpp) {
405 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
406 if (r < 0) {
407 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
408 return r;
409 }
410
411 set_status() << "send request";
412
413 librados::ObjectWriteOperation op;
414 op.omap_rm_keys(keys);
415
416 cn = stack->create_completion_notifier();
417 return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
418 }
419
420 int RGWRadosRemoveOmapKeysCR::request_complete()
421 {
422 int r = cn->completion()->get_return_value();
423
424 set_status() << "request complete; ret=" << r;
425
426 return r;
427 }
428
429 RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
430 RGWObjVersionTracker* objv_tracker)
431 : RGWSimpleCoroutine(store->ctx()),
432 store(store), obj(obj), objv_tracker(objv_tracker)
433 {
434 set_description() << "remove dest=" << obj;
435 }
436
437 int RGWRadosRemoveCR::send_request(const DoutPrefixProvider *dpp)
438 {
439 auto rados = store->getRados()->get_rados_handle();
440 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
441 if (r < 0) {
442 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
443 return r;
444 }
445 ioctx.locator_set_key(obj.loc);
446
447 set_status() << "send request";
448
449 librados::ObjectWriteOperation op;
450 if (objv_tracker) {
451 objv_tracker->prepare_op_for_write(&op);
452 }
453 op.remove();
454
455 cn = stack->create_completion_notifier();
456 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
457 }
458
459 int RGWRadosRemoveCR::request_complete()
460 {
461 int r = cn->completion()->get_return_value();
462
463 set_status() << "request complete; ret=" << r;
464
465 return r;
466 }
467
468 RGWRadosRemoveOidCR::RGWRadosRemoveOidCR(rgw::sal::RadosStore* store,
469 librados::IoCtx&& ioctx,
470 std::string_view oid,
471 RGWObjVersionTracker* objv_tracker)
472 : RGWSimpleCoroutine(store->ctx()), ioctx(std::move(ioctx)),
473 oid(std::string(oid)), objv_tracker(objv_tracker)
474 {
475 set_description() << "remove dest=" << oid;
476 }
477
478 RGWRadosRemoveOidCR::RGWRadosRemoveOidCR(rgw::sal::RadosStore* store,
479 RGWSI_RADOS::Obj& obj,
480 RGWObjVersionTracker* objv_tracker)
481 : RGWSimpleCoroutine(store->ctx()),
482 ioctx(librados::IoCtx(obj.get_ref().pool.ioctx())),
483 oid(obj.get_ref().obj.oid),
484 objv_tracker(objv_tracker)
485 {
486 set_description() << "remove dest=" << oid;
487 }
488
489 RGWRadosRemoveOidCR::RGWRadosRemoveOidCR(rgw::sal::RadosStore* store,
490 RGWSI_RADOS::Obj&& obj,
491 RGWObjVersionTracker* objv_tracker)
492 : RGWSimpleCoroutine(store->ctx()),
493 ioctx(std::move(obj.get_ref().pool.ioctx())),
494 oid(std::move(obj.get_ref().obj.oid)),
495 objv_tracker(objv_tracker)
496 {
497 set_description() << "remove dest=" << oid;
498 }
499
500 int RGWRadosRemoveOidCR::send_request(const DoutPrefixProvider *dpp)
501 {
502 librados::ObjectWriteOperation op;
503 if (objv_tracker) {
504 objv_tracker->prepare_op_for_write(&op);
505 }
506 op.remove();
507
508 cn = stack->create_completion_notifier();
509 return ioctx.aio_operate(oid, cn->completion(), &op);
510 }
511
512 int RGWRadosRemoveOidCR::request_complete()
513 {
514 int r = cn->completion()->get_return_value();
515
516 set_status() << "request complete; ret=" << r;
517
518 return r;
519 }
520
521 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
522 const rgw_raw_obj& _obj,
523 const string& _lock_name,
524 const string& _cookie,
525 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
526 async_rados(_async_rados),
527 store(_store),
528 lock_name(_lock_name),
529 cookie(_cookie),
530 duration(_duration),
531 obj(_obj),
532 req(NULL)
533 {
534 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
535 }
536
537 void RGWSimpleRadosLockCR::request_cleanup()
538 {
539 if (req) {
540 req->finish();
541 req = NULL;
542 }
543 }
544
545 int RGWSimpleRadosLockCR::send_request(const DoutPrefixProvider *dpp)
546 {
547 set_status() << "sending request";
548 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
549 store, NULL, obj, lock_name, cookie, duration);
550 async_rados->queue(req);
551 return 0;
552 }
553
554 int RGWSimpleRadosLockCR::request_complete()
555 {
556 set_status() << "request complete; ret=" << req->get_ret_status();
557 return req->get_ret_status();
558 }
559
560 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
561 const rgw_raw_obj& _obj,
562 const string& _lock_name,
563 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
564 async_rados(_async_rados),
565 store(_store),
566 lock_name(_lock_name),
567 cookie(_cookie),
568 obj(_obj),
569 req(NULL)
570 {
571 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
572 }
573
574 void RGWSimpleRadosUnlockCR::request_cleanup()
575 {
576 if (req) {
577 req->finish();
578 req = NULL;
579 }
580 }
581
582 int RGWSimpleRadosUnlockCR::send_request(const DoutPrefixProvider *dpp)
583 {
584 set_status() << "sending request";
585
586 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
587 store, NULL, obj, lock_name, cookie);
588 async_rados->queue(req);
589 return 0;
590 }
591
592 int RGWSimpleRadosUnlockCR::request_complete()
593 {
594 set_status() << "request complete; ret=" << req->get_ret_status();
595 return req->get_ret_status();
596 }
597
598 int RGWOmapAppend::operate(const DoutPrefixProvider *dpp) {
599 reenter(this) {
600 for (;;) {
601 if (!has_product() && going_down) {
602 set_status() << "going down";
603 break;
604 }
605 set_status() << "waiting for product";
606 yield wait_for_product();
607 yield {
608 string entry;
609 while (consume(&entry)) {
610 set_status() << "adding entry: " << entry;
611 entries[entry] = bufferlist();
612 if (entries.size() >= window_size) {
613 break;
614 }
615 }
616 if (entries.size() >= window_size || going_down) {
617 set_status() << "flushing to omap";
618 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
619 entries.clear();
620 }
621 }
622 if (get_ret_status() < 0) {
623 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
624 return set_state(RGWCoroutine_Error);
625 }
626 }
627 /* done with coroutine */
628 return set_state(RGWCoroutine_Done);
629 }
630 return 0;
631 }
632
633 void RGWOmapAppend::flush_pending() {
634 receive(pending_entries);
635 num_pending_entries = 0;
636 }
637
638 bool RGWOmapAppend::append(const string& s) {
639 if (is_done()) {
640 return false;
641 }
642 ++total_entries;
643 pending_entries.push_back(s);
644 if (++num_pending_entries >= (int)window_size) {
645 flush_pending();
646 }
647 return true;
648 }
649
650 bool RGWOmapAppend::finish() {
651 going_down = true;
652 flush_pending();
653 set_sleeping(false);
654 return (!is_done());
655 }
656
657 int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp)
658 {
659 int r;
660 if (!bucket.bucket_id.empty()) {
661 r = store->getRados()->get_bucket_instance_info(bucket, bucket_info, nullptr, &attrs, null_yield, dpp);
662 } else {
663 r = store->ctl()->bucket->read_bucket_info(bucket, &bucket_info, null_yield, dpp,
664 RGWBucketCtl::BucketInstance::GetParams().set_attrs(&attrs));
665 }
666 if (r < 0) {
667 ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info for "
668 << bucket << dendl;
669 return r;
670 }
671
672 return 0;
673 }
674
675 int RGWAsyncPutBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp)
676 {
677 auto r = store->getRados()->put_bucket_instance_info(bucket_info, exclusive,
678 mtime, attrs, dpp, null_yield);
679 if (r < 0) {
680 ldpp_dout(dpp, 0) << "ERROR: failed to put bucket instance info for "
681 << bucket_info.bucket << dendl;
682 return r;
683 }
684
685 return 0;
686 }
687
688 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(
689 const DoutPrefixProvider *dpp,
690 rgw::sal::RadosStore* store,
691 const RGWBucketInfo& bucket_info,
692 int shard_id,
693 const rgw::bucket_index_layout_generation& generation,
694 const std::string& start_marker,
695 const std::string& end_marker)
696 : RGWSimpleCoroutine(store->ctx()), bucket_info(bucket_info),
697 shard_id(shard_id), generation(generation), bs(store->getRados()),
698 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
699 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
700 {
701 }
702
703 int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider *dpp)
704 {
705 int r = bs.init(dpp, bucket_info, generation, shard_id);
706 if (r < 0) {
707 ldpp_dout(dpp, -1) << "ERROR: bucket shard init failed ret=" << r << dendl;
708 return r;
709 }
710
711 bufferlist in;
712 cls_rgw_bi_log_trim_op call;
713 call.start_marker = std::move(start_marker);
714 call.end_marker = std::move(end_marker);
715 encode(call, in);
716
717 librados::ObjectWriteOperation op;
718 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
719
720 cn = stack->create_completion_notifier();
721 return bs.bucket_obj.aio_operate(cn->completion(), &op);
722 }
723
724 int RGWRadosBILogTrimCR::request_complete()
725 {
726 int r = cn->completion()->get_return_value();
727 set_status() << "request complete; ret=" << r;
728 return r;
729 }
730
731 int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
732 {
733 RGWObjectCtx obj_ctx(store);
734
735 char buf[16];
736 snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
737 rgw::sal::Attrs attrs;
738
739 rgw_obj src_obj(src_bucket, key);
740
741 rgw::sal::RadosBucket dest_bucket(store, dest_bucket_info);
742 rgw::sal::RadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket);
743
744 std::string etag;
745
746 std::optional<uint64_t> bytes_transferred;
747 int r = store->getRados()->fetch_remote_obj(obj_ctx,
748 user_id.value_or(rgw_user()),
749 NULL, /* req_info */
750 source_zone,
751 dest_obj.get_obj(),
752 src_obj,
753 dest_bucket_info, /* dest */
754 nullptr, /* source */
755 dest_placement_rule,
756 nullptr, /* real_time* src_mtime, */
757 NULL, /* real_time* mtime, */
758 NULL, /* const real_time* mod_ptr, */
759 NULL, /* const real_time* unmod_ptr, */
760 false, /* high precision time */
761 NULL, /* const char *if_match, */
762 NULL, /* const char *if_nomatch, */
763 RGWRados::ATTRSMOD_NONE,
764 copy_if_newer,
765 attrs,
766 RGWObjCategory::Main,
767 versioned_epoch,
768 real_time(), /* delete_at */
769 NULL, /* string *ptag, */
770 &etag, /* string *petag, */
771 NULL, /* void (*progress_cb)(off_t, void *), */
772 NULL, /* void *progress_data*); */
773 dpp,
774 filter.get(),
775 source_trace_entry,
776 &zones_trace,
777 &bytes_transferred);
778
779 if (r < 0) {
780 ldpp_dout(dpp, 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
781 if (counters) {
782 counters->inc(sync_counters::l_fetch_err, 1);
783 }
784 } else {
785 // r >= 0
786 if (bytes_transferred) {
787 // send notification that object was succesfully synced
788 std::string user_id = "rgw sync";
789 std::string req_id = "0";
790
791 RGWObjTags obj_tags;
792 auto iter = attrs.find(RGW_ATTR_TAGS);
793 if (iter != attrs.end()) {
794 try {
795 auto it = iter->second.cbegin();
796 obj_tags.decode(it);
797 } catch (buffer::error &err) {
798 ldpp_dout(dpp, 1) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
799 }
800 }
801
802 // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const
803 std::string tenant(dest_bucket.get_tenant());
804
805 std::unique_ptr<rgw::sal::Notification> notify
806 = store->get_notification(dpp, &dest_obj, nullptr, rgw::notify::ObjectSyncedCreate,
807 &dest_bucket, user_id,
808 tenant,
809 req_id, null_yield);
810
811 auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
812 int ret = rgw::notify::publish_reserve(dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
813 if (ret < 0) {
814 ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl;
815 // no need to return, the sync already happened
816 } else {
817 ret = rgw::notify::publish_commit(&dest_obj, dest_obj.get_obj_size(), ceph::real_clock::now(), etag, dest_obj.get_instance(), rgw::notify::ObjectSyncedCreate, notify_res, dpp);
818 if (ret < 0) {
819 ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
820 }
821 }
822 }
823
824 if (counters) {
825 if (bytes_transferred) {
826 counters->inc(sync_counters::l_fetch, *bytes_transferred);
827 } else {
828 counters->inc(sync_counters::l_fetch_not_modified);
829 }
830 }
831 }
832 return r;
833 }
834
835 int RGWAsyncStatRemoteObj::_send_request(const DoutPrefixProvider *dpp)
836 {
837 RGWObjectCtx obj_ctx(store);
838
839 string user_id;
840 char buf[16];
841 snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
842
843
844 rgw_obj src_obj(src_bucket, key);
845
846 int r = store->getRados()->stat_remote_obj(dpp,
847 obj_ctx,
848 rgw_user(user_id),
849 nullptr, /* req_info */
850 source_zone,
851 src_obj,
852 nullptr, /* source */
853 pmtime, /* real_time* src_mtime, */
854 psize, /* uint64_t * */
855 nullptr, /* const real_time* mod_ptr, */
856 nullptr, /* const real_time* unmod_ptr, */
857 true, /* high precision time */
858 nullptr, /* const char *if_match, */
859 nullptr, /* const char *if_nomatch, */
860 pattrs,
861 pheaders,
862 nullptr,
863 nullptr, /* string *ptag, */
864 petag); /* string *petag, */
865
866 if (r < 0) {
867 ldpp_dout(dpp, 0) << "store->stat_remote_obj() returned r=" << r << dendl;
868 }
869 return r;
870 }
871
872
873 int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider *dpp)
874 {
875 ldpp_dout(dpp, 0) << __func__ << "(): deleting obj=" << obj << dendl;
876
877 obj->set_atomic();
878
879 RGWObjState *state;
880
881 int ret = obj->get_obj_state(dpp, &state, null_yield);
882 if (ret < 0) {
883 ldpp_dout(dpp, 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
884 return ret;
885 }
886
887 /* has there been any racing object write? */
888 if (del_if_older && (state->mtime > timestamp)) {
889 ldpp_dout(dpp, 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
890 return 0;
891 }
892
893 RGWAccessControlPolicy policy;
894
895 /* decode policy */
896 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
897 if (iter != state->attrset.end()) {
898 auto bliter = iter->second.cbegin();
899 try {
900 policy.decode(bliter);
901 } catch (buffer::error& err) {
902 ldpp_dout(dpp, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
903 return -EIO;
904 }
905 }
906
907 std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
908
909 del_op->params.bucket_owner = bucket->get_info().owner;
910 del_op->params.obj_owner = policy.get_owner();
911 if (del_if_older) {
912 del_op->params.unmod_since = timestamp;
913 }
914 if (versioned) {
915 del_op->params.versioning_status = BUCKET_VERSIONED;
916 }
917 del_op->params.olh_epoch = versioned_epoch;
918 del_op->params.marker_version_id = marker_version_id;
919 del_op->params.obj_owner.set_id(rgw_user(owner));
920 del_op->params.obj_owner.set_name(owner_display_name);
921 del_op->params.mtime = timestamp;
922 del_op->params.high_precision_time = true;
923 del_op->params.zones_trace = &zones_trace;
924
925 ret = del_op->delete_obj(dpp, null_yield);
926 if (ret < 0) {
927 ldpp_dout(dpp, 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
928 }
929 return ret;
930 }
931
932 int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
933 {
934 if (aborted) {
935 caller->set_sleeping(false);
936 return set_cr_done();
937 }
938 reenter(this) {
939 last_renew_try_time = ceph::coarse_mono_clock::now();
940 while (!going_down) {
941 current_time = ceph::coarse_mono_clock::now();
942 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
943 if (latency) {
944 latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
945 }
946 current_time = ceph::coarse_mono_clock::now();
947 if (current_time - last_renew_try_time > interval_tolerance) {
948 // renewal should happen between 50%-90% of interval
949 ldout(store->ctx(), 1) << *this << ": WARNING: did not renew lock " << obj << ":" << lock_name << ": within 90\% of interval. " <<
950 (current_time - last_renew_try_time) << " > " << interval_tolerance << dendl;
951 }
952 last_renew_try_time = current_time;
953
954 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
955 if (retcode < 0) {
956 set_locked(false);
957 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
958 return set_state(RGWCoroutine_Error, retcode);
959 }
960 ldout(store->ctx(), 20) << *this << ": successfully locked " << obj << ":" << lock_name << dendl;
961 set_locked(true);
962 yield wait(utime_t(interval / 2, 0));
963 }
964 set_locked(false); /* moot at this point anyway */
965 current_time = ceph::coarse_mono_clock::now();
966 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
967 if (latency) {
968 latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
969 }
970 return set_state(RGWCoroutine_Done);
971 }
972 return 0;
973 }
974
975 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const string& _oid,
976 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
977 dpp(_dpp),
978 store(_store),
979 oid(_oid), cn(NULL)
980 {
981 stringstream& s = set_description();
982 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
983 entries.push_back(entry);
984 }
985
986 int RGWRadosTimelogAddCR::send_request(const DoutPrefixProvider *dpp)
987 {
988 set_status() << "sending request";
989
990 cn = stack->create_completion_notifier();
991 return store->svc()->cls->timelog.add(dpp, oid, entries, cn->completion(), true, null_yield);
992 }
993
994 int RGWRadosTimelogAddCR::request_complete()
995 {
996 int r = cn->completion()->get_return_value();
997
998 set_status() << "request complete; ret=" << r;
999
1000 return r;
1001 }
1002
1003 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(const DoutPrefixProvider *dpp,
1004 rgw::sal::RadosStore* store,
1005 const std::string& oid,
1006 const real_time& start_time,
1007 const real_time& end_time,
1008 const std::string& from_marker,
1009 const std::string& to_marker)
1010 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), oid(oid),
1011 start_time(start_time), end_time(end_time),
1012 from_marker(from_marker), to_marker(to_marker)
1013 {
1014 set_description() << "timelog trim oid=" << oid
1015 << " start_time=" << start_time << " end_time=" << end_time
1016 << " from_marker=" << from_marker << " to_marker=" << to_marker;
1017 }
1018
1019 int RGWRadosTimelogTrimCR::send_request(const DoutPrefixProvider *dpp)
1020 {
1021 set_status() << "sending request";
1022
1023 cn = stack->create_completion_notifier();
1024 return store->svc()->cls->timelog.trim(dpp, oid, start_time, end_time, from_marker,
1025 to_marker, cn->completion(),
1026 null_yield);
1027 }
1028
1029 int RGWRadosTimelogTrimCR::request_complete()
1030 {
1031 int r = cn->completion()->get_return_value();
1032
1033 set_status() << "request complete; ret=" << r;
1034
1035 return r;
1036 }
1037
1038
1039 RGWSyncLogTrimCR::RGWSyncLogTrimCR(const DoutPrefixProvider *dpp,
1040 rgw::sal::RadosStore* store, const std::string& oid,
1041 const std::string& to_marker,
1042 std::string *last_trim_marker)
1043 : RGWRadosTimelogTrimCR(dpp, store, oid, real_time{}, real_time{},
1044 std::string{}, to_marker),
1045 cct(store->ctx()), last_trim_marker(last_trim_marker)
1046 {
1047 }
1048
1049 int RGWSyncLogTrimCR::request_complete()
1050 {
1051 int r = RGWRadosTimelogTrimCR::request_complete();
1052 if (r != -ENODATA) {
1053 return r;
1054 }
1055 // nothing left to trim, update last_trim_marker
1056 if (*last_trim_marker < to_marker && to_marker != max_marker) {
1057 *last_trim_marker = to_marker;
1058 }
1059 return 0;
1060 }
1061
1062
1063 int RGWAsyncStatObj::_send_request(const DoutPrefixProvider *dpp)
1064 {
1065 rgw_raw_obj raw_obj;
1066 store->getRados()->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
1067 return store->getRados()->raw_obj_stat(dpp, raw_obj, psize, pmtime, pepoch,
1068 nullptr, nullptr, objv_tracker, null_yield);
1069 }
1070
1071 RGWStatObjCR::RGWStatObjCR(const DoutPrefixProvider *dpp,
1072 RGWAsyncRadosProcessor *async_rados, rgw::sal::RadosStore* store,
1073 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
1074 real_time* pmtime, uint64_t *pepoch,
1075 RGWObjVersionTracker *objv_tracker)
1076 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), async_rados(async_rados),
1077 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
1078 objv_tracker(objv_tracker)
1079 {
1080 }
1081
1082 void RGWStatObjCR::request_cleanup()
1083 {
1084 if (req) {
1085 req->finish();
1086 req = NULL;
1087 }
1088 }
1089
1090 int RGWStatObjCR::send_request(const DoutPrefixProvider *dpp)
1091 {
1092 req = new RGWAsyncStatObj(dpp, this, stack->create_completion_notifier(),
1093 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
1094 async_rados->queue(req);
1095 return 0;
1096 }
1097
1098 int RGWStatObjCR::request_complete()
1099 {
1100 return req->get_ret_status();
1101 }
1102
1103 RGWRadosNotifyCR::RGWRadosNotifyCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
1104 bufferlist& request, uint64_t timeout_ms,
1105 bufferlist *response)
1106 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
1107 request(request), timeout_ms(timeout_ms), response(response)
1108 {
1109 set_description() << "notify dest=" << obj;
1110 }
1111
1112 int RGWRadosNotifyCR::send_request(const DoutPrefixProvider *dpp)
1113 {
1114 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
1115 if (r < 0) {
1116 ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
1117 return r;
1118 }
1119
1120 set_status() << "sending request";
1121
1122 cn = stack->create_completion_notifier();
1123 return ref.pool.ioctx().aio_notify(ref.obj.oid, cn->completion(), request,
1124 timeout_ms, response);
1125 }
1126
1127 int RGWRadosNotifyCR::request_complete()
1128 {
1129 int r = cn->completion()->get_return_value();
1130
1131 set_status() << "request complete; ret=" << r;
1132
1133 return r;
1134 }
1135
1136
1137 int RGWDataPostNotifyCR::operate(const DoutPrefixProvider* dpp)
1138 {
1139 reenter(this) {
1140 using PostNotify2 = RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>>, int>;
1141 yield {
1142 rgw_http_param_pair pairs[] = { { "type", "data" },
1143 { "notify2", NULL },
1144 { "source-zone", source_zone },
1145 { NULL, NULL } };
1146 call(new PostNotify2(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, nullptr));
1147 }
1148 if (retcode == -ERR_METHOD_NOT_ALLOWED) {
1149 using PostNotify1 = RGWPostRESTResourceCR<rgw_data_notify_v1_encoder, int>;
1150 yield {
1151 rgw_http_param_pair pairs[] = { { "type", "data" },
1152 { "notify", NULL },
1153 { "source-zone", source_zone },
1154 { NULL, NULL } };
1155 auto encoder = rgw_data_notify_v1_encoder{shards};
1156 call(new PostNotify1(store->ctx(), conn, &http_manager, "/admin/log", pairs, encoder, nullptr));
1157 }
1158 }
1159 if (retcode < 0) {
1160 return set_cr_error(retcode);
1161 }
1162 return set_cr_done();
1163 }
1164 return 0;
1165 }