1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
11 #include <boost/algorithm/string/split.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <boost/variant.hpp>
16 #include "include/scope_guard.h"
17 #include "include/function2.hpp"
18 #include "common/Formatter.h"
19 #include "common/containers.h"
20 #include "common/split.h"
21 #include <common/errno.h>
22 #include "include/random.h"
23 #include "cls/lock/cls_lock_client.h"
24 #include "rgw_perf_counters.h"
25 #include "rgw_common.h"
26 #include "rgw_bucket.h"
29 #include "rgw_string.h"
30 #include "rgw_multi.h"
32 #include "rgw_lc_tier.h"
33 #include "rgw_notify.h"
35 #include "fmt/format.h"
37 #include "services/svc_sys_obj.h"
38 #include "services/svc_zone.h"
39 #include "services/svc_tier_rados.h"
41 #define dout_context g_ceph_context
42 #define dout_subsys ceph_subsys_rgw
46 const char* LC_STATUS
[] = {
53 using namespace librados
;
55 bool LCRule::valid() const
57 if (id
.length() > MAX_ID_LEN
) {
60 else if(expiration
.empty() && noncur_expiration
.empty() &&
61 mp_expiration
.empty() && !dm_expiration
&&
62 transitions
.empty() && noncur_transitions
.empty()) {
65 else if (!expiration
.valid() || !noncur_expiration
.valid() ||
66 !mp_expiration
.valid()) {
69 if (!transitions
.empty()) {
70 bool using_days
= expiration
.has_days();
71 bool using_date
= expiration
.has_date();
72 for (const auto& elem
: transitions
) {
73 if (!elem
.second
.valid()) {
76 using_days
= using_days
|| elem
.second
.has_days();
77 using_date
= using_date
|| elem
.second
.has_date();
78 if (using_days
&& using_date
) {
83 for (const auto& elem
: noncur_transitions
) {
84 if (!elem
.second
.valid()) {
92 void LCRule::init_simple_days_rule(std::string_view _id
,
93 std::string_view _prefix
, int num_days
)
98 snprintf(buf
, sizeof(buf
), "%d", num_days
);
99 expiration
.set_days(buf
);
103 void RGWLifecycleConfiguration::add_rule(const LCRule
& rule
)
105 auto& id
= rule
.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
106 rule_map
.insert(pair
<string
, LCRule
>(id
, rule
));
109 bool RGWLifecycleConfiguration::_add_rule(const LCRule
& rule
)
111 lc_op
op(rule
.get_id());
112 op
.status
= rule
.is_enabled();
113 if (rule
.get_expiration().has_days()) {
114 op
.expiration
= rule
.get_expiration().get_days();
116 if (rule
.get_expiration().has_date()) {
117 op
.expiration_date
= ceph::from_iso_8601(rule
.get_expiration().get_date());
119 if (rule
.get_noncur_expiration().has_days()) {
120 op
.noncur_expiration
= rule
.get_noncur_expiration().get_days();
122 if (rule
.get_mp_expiration().has_days()) {
123 op
.mp_expiration
= rule
.get_mp_expiration().get_days();
125 op
.dm_expiration
= rule
.get_dm_expiration();
126 for (const auto &elem
: rule
.get_transitions()) {
127 transition_action action
;
128 if (elem
.second
.has_days()) {
129 action
.days
= elem
.second
.get_days();
131 action
.date
= ceph::from_iso_8601(elem
.second
.get_date());
134 = rgw_placement_rule::get_canonical_storage_class(elem
.first
);
135 op
.transitions
.emplace(elem
.first
, std::move(action
));
137 for (const auto &elem
: rule
.get_noncur_transitions()) {
138 transition_action action
;
139 action
.days
= elem
.second
.get_days();
140 action
.date
= ceph::from_iso_8601(elem
.second
.get_date());
142 = rgw_placement_rule::get_canonical_storage_class(elem
.first
);
143 op
.noncur_transitions
.emplace(elem
.first
, std::move(action
));
146 if (rule
.get_filter().has_prefix()){
147 prefix
= rule
.get_filter().get_prefix();
149 prefix
= rule
.get_prefix();
151 if (rule
.get_filter().has_tags()){
152 op
.obj_tags
= rule
.get_filter().get_tags();
154 op
.rule_flags
= rule
.get_filter().get_flags();
155 prefix_map
.emplace(std::move(prefix
), std::move(op
));
159 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule
& rule
)
164 auto& id
= rule
.get_id();
165 if (rule_map
.find(id
) != rule_map
.end()) { //id shouldn't be the same
168 if (rule
.get_filter().has_tags() && (rule
.get_dm_expiration() ||
169 !rule
.get_mp_expiration().empty())) {
170 return -ERR_INVALID_REQUEST
;
172 rule_map
.insert(pair
<string
, LCRule
>(id
, rule
));
174 if (!_add_rule(rule
)) {
175 return -ERR_INVALID_REQUEST
;
180 bool RGWLifecycleConfiguration::has_same_action(const lc_op
& first
,
181 const lc_op
& second
) {
182 if ((first
.expiration
> 0 || first
.expiration_date
!= boost::none
) &&
183 (second
.expiration
> 0 || second
.expiration_date
!= boost::none
)) {
185 } else if (first
.noncur_expiration
> 0 && second
.noncur_expiration
> 0) {
187 } else if (first
.mp_expiration
> 0 && second
.mp_expiration
> 0) {
189 } else if (!first
.transitions
.empty() && !second
.transitions
.empty()) {
190 for (auto &elem
: first
.transitions
) {
191 if (second
.transitions
.find(elem
.first
) != second
.transitions
.end()) {
195 } else if (!first
.noncur_transitions
.empty() &&
196 !second
.noncur_transitions
.empty()) {
197 for (auto &elem
: first
.noncur_transitions
) {
198 if (second
.noncur_transitions
.find(elem
.first
) !=
199 second
.noncur_transitions
.end()) {
207 /* Formerly, this method checked for duplicate rules using an invalid
208 * method (prefix uniqueness). */
209 bool RGWLifecycleConfiguration::valid()
214 void *RGWLC::LCWorker::entry() {
216 std::unique_ptr
<rgw::sal::Bucket
> all_buckets
; // empty restriction
217 utime_t start
= ceph_clock_now();
218 if (should_work(start
)) {
219 ldpp_dout(dpp
, 2) << "life cycle: start" << dendl
;
220 int r
= lc
->process(this, all_buckets
, false /* once */);
222 ldpp_dout(dpp
, 0) << "ERROR: do life cycle process() returned error r="
225 ldpp_dout(dpp
, 2) << "life cycle: stop" << dendl
;
226 cloud_targets
.clear(); // clear cloud targets
228 if (lc
->going_down())
231 utime_t end
= ceph_clock_now();
232 int secs
= schedule_next_start_time(start
, end
);
234 next
.set_from_double(end
+ secs
);
236 ldpp_dout(dpp
, 5) << "schedule life cycle next start time: "
237 << rgw_to_asctime(next
) << dendl
;
239 std::unique_lock l
{lock
};
240 cond
.wait_for(l
, std::chrono::seconds(secs
));
241 } while (!lc
->going_down());
246 void RGWLC::initialize(CephContext
*_cct
, rgw::sal::Driver
* _driver
) {
249 sal_lc
= driver
->get_lifecycle();
250 max_objs
= cct
->_conf
->rgw_lc_max_objs
;
251 if (max_objs
> HASH_PRIME
)
252 max_objs
= HASH_PRIME
;
254 obj_names
= new string
[max_objs
];
256 for (int i
= 0; i
< max_objs
; i
++) {
257 obj_names
[i
] = lc_oid_prefix
;
259 snprintf(buf
, 32, ".%d", i
);
260 obj_names
[i
].append(buf
);
263 #define COOKIE_LEN 16
264 char cookie_buf
[COOKIE_LEN
+ 1];
265 gen_rand_alphanumeric(cct
, cookie_buf
, sizeof(cookie_buf
) - 1);
269 void RGWLC::finalize()
274 static inline std::ostream
& operator<<(std::ostream
&os
, rgw::sal::Lifecycle::LCEntry
& ent
) {
275 os
<< "<ent: bucket=";
276 os
<< ent
.get_bucket();
277 os
<< "; start_time=";
278 os
<< rgw_to_asctime(utime_t(time_t(ent
.get_start_time()), 0));
280 os
<< LC_STATUS
[ent
.get_status()];
285 static bool obj_has_expired(const DoutPrefixProvider
*dpp
, CephContext
*cct
, ceph::real_time mtime
, int days
,
286 ceph::real_time
*expire_time
= nullptr)
288 double timediff
, cmp
;
290 if (cct
->_conf
->rgw_lc_debug_interval
<= 0) {
291 /* Normal case, run properly */
292 cmp
= double(days
)*24*60*60;
293 base_time
= ceph_clock_now().round_to_day();
295 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
296 cmp
= double(days
)*cct
->_conf
->rgw_lc_debug_interval
;
297 base_time
= ceph_clock_now();
299 auto tt_mtime
= ceph::real_clock::to_time_t(mtime
);
300 timediff
= base_time
- tt_mtime
;
303 *expire_time
= mtime
+ make_timespan(cmp
);
306 ldpp_dout(dpp
, 20) << __func__
307 << "(): mtime=" << mtime
<< " days=" << days
308 << " base_time=" << base_time
<< " timediff=" << timediff
310 << " is_expired=" << (timediff
>= cmp
)
313 return (timediff
>= cmp
);
316 static bool pass_object_lock_check(rgw::sal::Driver
* driver
, rgw::sal::Object
* obj
, const DoutPrefixProvider
*dpp
)
318 if (!obj
->get_bucket()->get_info().obj_lock_enabled()) {
321 std::unique_ptr
<rgw::sal::Object::ReadOp
> read_op
= obj
->get_read_op();
322 int ret
= read_op
->prepare(null_yield
, dpp
);
324 if (ret
== -ENOENT
) {
330 auto iter
= obj
->get_attrs().find(RGW_ATTR_OBJECT_RETENTION
);
331 if (iter
!= obj
->get_attrs().end()) {
332 RGWObjectRetention retention
;
334 decode(retention
, iter
->second
);
335 } catch (buffer::error
& err
) {
336 ldpp_dout(dpp
, 0) << "ERROR: failed to decode RGWObjectRetention"
340 if (ceph::real_clock::to_time_t(retention
.get_retain_until_date()) >
345 iter
= obj
->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD
);
346 if (iter
!= obj
->get_attrs().end()) {
347 RGWObjectLegalHold obj_legal_hold
;
349 decode(obj_legal_hold
, iter
->second
);
350 } catch (buffer::error
& err
) {
351 ldpp_dout(dpp
, 0) << "ERROR: failed to decode RGWObjectLegalHold"
355 if (obj_legal_hold
.is_enabled()) {
364 rgw::sal::Driver
* driver
;
365 rgw::sal::Bucket
* bucket
;
366 rgw::sal::Bucket::ListParams list_params
;
367 rgw::sal::Bucket::ListResults list_results
;
369 vector
<rgw_bucket_dir_entry
>::iterator obj_iter
;
370 rgw_bucket_dir_entry pre_obj
;
374 LCObjsLister(rgw::sal::Driver
* _driver
, rgw::sal::Bucket
* _bucket
) :
375 driver(_driver
), bucket(_bucket
) {
376 list_params
.list_versions
= bucket
->versioned();
377 list_params
.allow_unordered
= true;
378 delay_ms
= driver
->ctx()->_conf
.get_val
<int64_t>("rgw_lc_thread_delay");
381 void set_prefix(const string
& p
) {
383 list_params
.prefix
= prefix
;
386 int init(const DoutPrefixProvider
*dpp
) {
390 int fetch(const DoutPrefixProvider
*dpp
) {
391 int ret
= bucket
->list(dpp
, list_params
, 1000, list_results
, null_yield
);
396 obj_iter
= list_results
.objs
.begin();
402 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms
));
405 bool get_obj(const DoutPrefixProvider
*dpp
, rgw_bucket_dir_entry
**obj
,
406 std::function
<void(void)> fetch_barrier
407 = []() { /* nada */}) {
408 if (obj_iter
== list_results
.objs
.end()) {
409 if (!list_results
.is_truncated
) {
414 list_params
.marker
= pre_obj
.key
;
415 int ret
= fetch(dpp
);
417 ldpp_dout(dpp
, 0) << "ERROR: list_op returned ret=" << ret
424 /* returning address of entry in objs */
426 return obj_iter
!= list_results
.objs
.end();
429 rgw_bucket_dir_entry
get_prev_obj() {
438 boost::optional
<std::string
> next_key_name() {
439 if (obj_iter
== list_results
.objs
.end() ||
440 (obj_iter
+ 1) == list_results
.objs
.end()) {
441 /* this should have been called after get_obj() was called, so this should
442 * only happen if is_truncated is false */
446 return ((obj_iter
+ 1)->key
.name
);
449 }; /* LCObjsLister */
453 using LCWorker
= RGWLC::LCWorker
;
456 rgw::sal::Driver
* driver
;
458 rgw::sal::Bucket
* bucket
;
461 op_env(lc_op
& _op
, rgw::sal::Driver
* _driver
, LCWorker
* _worker
,
462 rgw::sal::Bucket
* _bucket
, LCObjsLister
& _ol
)
463 : op(_op
), driver(_driver
), worker(_worker
), bucket(_bucket
),
473 rgw_bucket_dir_entry o
;
474 boost::optional
<std::string
> next_key_name
;
475 ceph::real_time effective_mtime
;
477 rgw::sal::Driver
* driver
;
478 rgw::sal::Bucket
* bucket
;
479 lc_op
& op
; // ok--refers to expanded env.op
482 std::unique_ptr
<rgw::sal::Object
> obj
;
484 const DoutPrefixProvider
*dpp
;
487 std::unique_ptr
<rgw::sal::PlacementTier
> tier
;
489 lc_op_ctx(op_env
& env
, rgw_bucket_dir_entry
& o
,
490 boost::optional
<std::string
> next_key_name
,
491 ceph::real_time effective_mtime
,
492 const DoutPrefixProvider
*dpp
, WorkQ
* wq
)
493 : cct(env
.driver
->ctx()), env(env
), o(o
), next_key_name(next_key_name
),
494 effective_mtime(effective_mtime
),
495 driver(env
.driver
), bucket(env
.bucket
), op(env
.op
), ol(env
.ol
),
496 rctx(env
.driver
), dpp(dpp
), wq(wq
)
498 obj
= bucket
->get_object(o
.key
);
501 bool next_has_same_name(const std::string
& key_name
) {
502 return (next_key_name
&& key_name
.compare(
503 boost::get
<std::string
>(next_key_name
)) == 0);
509 static std::string lc_id
= "rgw lifecycle";
510 static std::string lc_req_id
= "0";
512 static int remove_expired_obj(
513 const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
, bool remove_indeed
,
514 rgw::notify::EventType event_type
)
516 auto& driver
= oc
.driver
;
517 auto& bucket_info
= oc
.bucket
->get_info();
519 auto obj_key
= o
.key
;
522 std::string version_id
;
523 std::unique_ptr
<rgw::sal::Notification
> notify
;
525 if (!remove_indeed
) {
526 obj_key
.instance
.clear();
527 } else if (obj_key
.instance
.empty()) {
528 obj_key
.instance
= "null";
531 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
532 std::unique_ptr
<rgw::sal::Object
> obj
;
534 ret
= driver
->get_bucket(nullptr, bucket_info
, &bucket
);
539 // XXXX currently, rgw::sal::Bucket.owner is always null here
540 std::unique_ptr
<rgw::sal::User
> user
;
541 if (! bucket
->get_owner()) {
542 auto& bucket_info
= bucket
->get_info();
543 user
= driver
->get_user(bucket_info
.owner
);
546 bucket
->set_owner(user
.get());
550 obj
= bucket
->get_object(obj_key
);
552 RGWObjState
* obj_state
{nullptr};
553 ret
= obj
->get_obj_state(dpp
, &obj_state
, null_yield
, true);
558 std::unique_ptr
<rgw::sal::Object::DeleteOp
> del_op
559 = obj
->get_delete_op();
560 del_op
->params
.versioning_status
561 = obj
->get_bucket()->get_info().versioning_status();
562 del_op
->params
.obj_owner
.set_id(rgw_user
{meta
.owner
});
563 del_op
->params
.obj_owner
.set_name(meta
.owner_display_name
);
564 del_op
->params
.bucket_owner
.set_id(bucket_info
.owner
);
565 del_op
->params
.unmod_since
= meta
.mtime
;
566 del_op
->params
.marker_version_id
= version_id
;
568 // notification supported only for RADOS driver for now
569 notify
= driver
->get_notification(dpp
, obj
.get(), nullptr, event_type
,
571 const_cast<std::string
&>(oc
.bucket
->get_tenant()),
572 lc_req_id
, null_yield
);
574 ret
= notify
->publish_reserve(dpp
, nullptr);
577 << "ERROR: notify reservation failed, deferring delete of object k="
582 ret
= del_op
->delete_obj(dpp
, null_yield
);
585 "ERROR: publishing notification failed, with error: " << ret
<< dendl
;
587 // send request to notification manager
588 (void) notify
->publish_commit(dpp
, obj_state
->size
,
589 ceph::real_clock::now(),
590 obj_state
->attrset
[RGW_ATTR_ETAG
].to_str(),
596 } /* remove_expired_obj */
600 virtual ~LCOpAction() {}
602 virtual bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) {
606 /* called after check(). Check should tell us whether this action
607 * is applicable. If there are multiple actions, we'll end up executing
608 * the latest applicable action
610 * one action after 10 days, another after 20, third after 40.
611 * After 10 days, the latest applicable action would be the first one,
612 * after 20 days it will be the second one. After 21 days it will still be the
613 * second one. So check() should return true for the second action at that point,
614 * but should_process() if the action has already been applied. In object removal
615 * it doesn't matter, but in object transition it does.
617 virtual bool should_process() {
621 virtual int process(lc_op_ctx
& oc
) {
625 friend class LCOpRule
;
630 virtual ~LCOpFilter() {}
631 virtual bool check(const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
) {
637 friend class LCOpAction
;
640 boost::optional
<std::string
> next_key_name
;
641 ceph::real_time effective_mtime
;
643 std::vector
<shared_ptr
<LCOpFilter
> > filters
; // n.b., sharing ovhd
644 std::vector
<shared_ptr
<LCOpAction
> > actions
;
647 LCOpRule(op_env
& _env
) : env(_env
) {}
649 boost::optional
<std::string
> get_next_key_name() {
650 return next_key_name
;
653 std::vector
<shared_ptr
<LCOpAction
>>& get_actions() {
659 int process(rgw_bucket_dir_entry
& o
, const DoutPrefixProvider
*dpp
,
664 boost::variant
<void*,
665 /* out-of-line delete */
666 std::tuple
<LCOpRule
, rgw_bucket_dir_entry
>,
667 /* uncompleted MPU expiration */
668 std::tuple
<lc_op
, rgw_bucket_dir_entry
>,
669 rgw_bucket_dir_entry
>;
671 class WorkQ
: public Thread
674 using unique_lock
= std::unique_lock
<std::mutex
>;
675 using work_f
= std::function
<void(RGWLC::LCWorker
*, WorkQ
*, WorkItem
&)>;
676 using dequeue_result
= boost::variant
<void*, WorkItem
>;
678 static constexpr uint32_t FLAG_NONE
= 0x0000;
679 static constexpr uint32_t FLAG_EWAIT_SYNC
= 0x0001;
680 static constexpr uint32_t FLAG_DWAIT_SYNC
= 0x0002;
681 static constexpr uint32_t FLAG_EDRAIN_SYNC
= 0x0004;
684 const work_f bsf
= [](RGWLC::LCWorker
* wk
, WorkQ
* wq
, WorkItem
& wi
) {};
689 std::condition_variable cv
;
691 vector
<WorkItem
> items
;
695 WorkQ(RGWLC::LCWorker
* wk
, uint32_t ix
, uint32_t qmax
)
696 : wk(wk
), qmax(qmax
), ix(ix
), flags(FLAG_NONE
), f(bsf
)
698 create(thr_name().c_str());
701 std::string
thr_name() {
702 return std::string
{"wp_thrd: "}
703 + std::to_string(wk
->ix
) + ", " + std::to_string(ix
);
706 void setf(work_f _f
) {
710 void enqueue(WorkItem
&& item
) {
711 unique_lock
uniq(mtx
);
712 while ((!wk
->get_lc()->going_down()) &&
713 (items
.size() > qmax
)) {
714 flags
|= FLAG_EWAIT_SYNC
;
715 cv
.wait_for(uniq
, 200ms
);
717 items
.push_back(item
);
718 if (flags
& FLAG_DWAIT_SYNC
) {
719 flags
&= ~FLAG_DWAIT_SYNC
;
725 unique_lock
uniq(mtx
);
726 flags
|= FLAG_EDRAIN_SYNC
;
727 while (flags
& FLAG_EDRAIN_SYNC
) {
728 cv
.wait_for(uniq
, 200ms
);
733 dequeue_result
dequeue() {
734 unique_lock
uniq(mtx
);
735 while ((!wk
->get_lc()->going_down()) &&
736 (items
.size() == 0)) {
737 /* clear drain state, as we are NOT doing work and qlen==0 */
738 if (flags
& FLAG_EDRAIN_SYNC
) {
739 flags
&= ~FLAG_EDRAIN_SYNC
;
741 flags
|= FLAG_DWAIT_SYNC
;
742 cv
.wait_for(uniq
, 200ms
);
744 if (items
.size() > 0) {
745 auto item
= items
.back();
747 if (flags
& FLAG_EWAIT_SYNC
) {
748 flags
&= ~FLAG_EWAIT_SYNC
;
756 void* entry() override
{
757 while (!wk
->get_lc()->going_down()) {
758 auto item
= dequeue();
759 if (item
.which() == 0) {
763 f(wk
, this, boost::get
<WorkItem
>(item
));
769 class RGWLC::WorkPool
771 using TVector
= ceph::containers::tiny_vector
<WorkQ
, 3>;
776 WorkPool(RGWLC::LCWorker
* wk
, uint16_t n_threads
, uint32_t qmax
)
779 [&](const size_t ix
, auto emplacer
) {
780 emplacer
.emplace(wk
, ix
, qmax
);
786 for (auto& wq
: wqs
) {
791 void setf(WorkQ::work_f _f
) {
792 for (auto& wq
: wqs
) {
797 void enqueue(WorkItem item
) {
799 ix
= (ix
+1) % wqs
.size();
800 (wqs
[tix
]).enqueue(std::move(item
));
804 for (auto& wq
: wqs
) {
810 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider
* dpp
, CephContext
*cct
,
812 : dpp(dpp
), cct(cct
), lc(lc
), ix(ix
)
814 auto wpw
= cct
->_conf
.get_val
<int64_t>("rgw_lc_max_wp_worker");
815 workpool
= new WorkPool(this, wpw
, 512);
818 static inline bool worker_should_stop(time_t stop_at
, bool once
)
820 return !once
&& stop_at
< time(nullptr);
823 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket
* target
,
824 const multimap
<string
, lc_op
>& prefix_map
,
825 LCWorker
* worker
, time_t stop_at
, bool once
)
827 MultipartMetaFilter mp_filter
;
829 rgw::sal::Bucket::ListParams params
;
830 rgw::sal::Bucket::ListResults results
;
831 auto delay_ms
= cct
->_conf
.get_val
<int64_t>("rgw_lc_thread_delay");
832 params
.list_versions
= false;
833 /* lifecycle processing does not depend on total order, so can
834 * take advantage of unordered listing optimizations--such as
835 * operating on one shard at a time */
836 params
.allow_unordered
= true;
837 params
.ns
= RGW_OBJ_NS_MULTIPART
;
838 params
.access_list_filter
= &mp_filter
;
840 auto pf
= [&](RGWLC::LCWorker
* wk
, WorkQ
* wq
, WorkItem
& wi
) {
841 auto wt
= boost::get
<std::tuple
<lc_op
, rgw_bucket_dir_entry
>>(wi
);
842 auto& [rule
, obj
] = wt
;
843 if (obj_has_expired(this, cct
, obj
.meta
.mtime
, rule
.mp_expiration
)) {
844 rgw_obj_key
key(obj
.key
);
845 std::unique_ptr
<rgw::sal::MultipartUpload
> mpu
= target
->get_multipart_upload(key
.name
);
846 int ret
= mpu
->abort(this, cct
);
849 perfcounter
->inc(l_rgw_lc_abort_mpu
, 1);
852 if (ret
== -ERR_NO_SUCH_UPLOAD
) {
853 ldpp_dout(wk
->get_lc(), 5)
854 << "ERROR: abort_multipart_upload failed, ret=" << ret
855 << ", thread:" << wq
->thr_name()
856 << ", meta:" << obj
.key
859 ldpp_dout(wk
->get_lc(), 0)
860 << "ERROR: abort_multipart_upload failed, ret=" << ret
861 << ", thread:" << wq
->thr_name()
862 << ", meta:" << obj
.key
869 worker
->workpool
->setf(pf
);
871 for (auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end();
874 if (worker_should_stop(stop_at
, once
)) {
875 ldpp_dout(this, 5) << __func__
<< " interval budget EXPIRED worker "
881 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.mp_expiration
<= 0) {
884 params
.prefix
= prefix_iter
->first
;
887 results
.objs
.clear();
888 ret
= target
->list(this, params
, 1000, results
, null_yield
);
890 if (ret
== (-ENOENT
))
892 ldpp_dout(this, 0) << "ERROR: driver->list_objects():" <<dendl
;
896 for (auto obj_iter
= results
.objs
.begin(); obj_iter
!= results
.objs
.end(); ++obj_iter
, ++offset
) {
897 std::tuple
<lc_op
, rgw_bucket_dir_entry
> t1
=
898 {prefix_iter
->second
, *obj_iter
};
899 worker
->workpool
->enqueue(WorkItem
{t1
});
905 if ((offset
% 100) == 0) {
906 if (worker_should_stop(stop_at
, once
)) {
907 ldpp_dout(this, 5) << __func__
<< " interval budget EXPIRED worker "
914 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms
));
915 } while(results
.is_truncated
);
916 } /* for prefix_map */
918 worker
->workpool
->drain();
920 } /* RGWLC::handle_multipart_expiration */
922 static int read_obj_tags(const DoutPrefixProvider
*dpp
, rgw::sal::Object
* obj
, bufferlist
& tags_bl
)
924 std::unique_ptr
<rgw::sal::Object::ReadOp
> rop
= obj
->get_read_op();
926 return rop
->get_attr(dpp
, RGW_ATTR_TAGS
, tags_bl
, null_yield
);
929 static bool is_valid_op(const lc_op
& op
)
933 || op
.expiration_date
!= boost::none
934 || op
.noncur_expiration
> 0
936 || !op
.transitions
.empty()
937 || !op
.noncur_transitions
.empty()));
940 static bool zone_check(const lc_op
& op
, rgw::sal::Zone
* zone
)
943 if (zone
->get_tier_type() == "archive") {
944 return (op
.rule_flags
& uint32_t(LCFlagType::ArchiveZone
));
946 return (! (op
.rule_flags
& uint32_t(LCFlagType::ArchiveZone
)));
950 static inline bool has_all_tags(const lc_op
& rule_action
,
951 const RGWObjTags
& object_tags
)
953 if(! rule_action
.obj_tags
)
955 if(object_tags
.count() < rule_action
.obj_tags
->count())
957 size_t tag_count
= 0;
958 for (const auto& tag
: object_tags
.get_tags()) {
959 const auto& rule_tags
= rule_action
.obj_tags
->get_tags();
960 const auto& iter
= rule_tags
.find(tag
.first
);
961 if(iter
== rule_tags
.end())
963 if(iter
->second
== tag
.second
)
967 /* all tags in the rule appear in obj tags */
969 return tag_count
== rule_action
.obj_tags
->count();
972 static int check_tags(const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
, bool *skip
)
976 if (op
.obj_tags
!= boost::none
) {
980 int ret
= read_obj_tags(dpp
, oc
.obj
.get(), tags_bl
);
982 if (ret
!= -ENODATA
) {
983 ldpp_dout(oc
.dpp
, 5) << "ERROR: read_obj_tags returned r="
984 << ret
<< " " << oc
.wq
->thr_name() << dendl
;
988 RGWObjTags dest_obj_tags
;
990 auto iter
= tags_bl
.cbegin();
991 dest_obj_tags
.decode(iter
);
992 } catch (buffer::error
& err
) {
993 ldpp_dout(oc
.dpp
,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
994 << oc
.wq
->thr_name() << dendl
;
998 if (! has_all_tags(op
, dest_obj_tags
)) {
999 ldpp_dout(oc
.dpp
, 20) << __func__
<< "() skipping obj " << oc
.obj
1000 << " as tags do not match in rule: "
1002 << oc
.wq
->thr_name() << dendl
;
1010 class LCOpFilter_Tags
: public LCOpFilter
{
1012 bool check(const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
) override
{
1015 if (o
.is_delete_marker()) {
1021 int ret
= check_tags(dpp
, oc
, &skip
);
1023 if (ret
== -ENOENT
) {
1026 ldpp_dout(oc
.dpp
, 0) << "ERROR: check_tags on obj=" << oc
.obj
1027 << " returned ret=" << ret
<< " "
1028 << oc
.wq
->thr_name() << dendl
;
1036 class LCOpAction_CurrentExpiration
: public LCOpAction
{
1038 LCOpAction_CurrentExpiration(op_env
& env
) {}
1040 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1042 if (!o
.is_current()) {
1043 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1044 << ": not current, skipping "
1045 << oc
.wq
->thr_name() << dendl
;
1048 if (o
.is_delete_marker()) {
1049 if (oc
.next_key_name
) {
1050 std::string nkn
= *oc
.next_key_name
;
1051 if (oc
.next_has_same_name(o
.key
.name
)) {
1052 ldpp_dout(dpp
, 7) << __func__
<< "(): dm-check SAME: key=" << o
.key
1053 << " next_key_name: %%" << nkn
<< "%% "
1054 << oc
.wq
->thr_name() << dendl
;
1057 ldpp_dout(dpp
, 7) << __func__
<< "(): dm-check DELE: key=" << o
.key
1058 << " next_key_name: %%" << nkn
<< "%% "
1059 << oc
.wq
->thr_name() << dendl
;
1060 *exp_time
= real_clock::now();
1067 auto& mtime
= o
.meta
.mtime
;
1070 if (op
.expiration
<= 0) {
1071 if (op
.expiration_date
== boost::none
) {
1072 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1073 << ": no expiration set in rule, skipping "
1074 << oc
.wq
->thr_name() << dendl
;
1077 is_expired
= ceph_clock_now() >=
1078 ceph::real_clock::to_time_t(*op
.expiration_date
);
1079 *exp_time
= *op
.expiration_date
;
1081 is_expired
= obj_has_expired(dpp
, oc
.cct
, mtime
, op
.expiration
, exp_time
);
1084 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired="
1085 << (int)is_expired
<< " "
1086 << oc
.wq
->thr_name() << dendl
;
1090 int process(lc_op_ctx
& oc
) {
1093 if (o
.is_delete_marker()) {
1094 r
= remove_expired_obj(oc
.dpp
, oc
, true,
1095 rgw::notify::ObjectExpirationDeleteMarker
);
1097 ldpp_dout(oc
.dpp
, 0) << "ERROR: current is-dm remove_expired_obj "
1098 << oc
.bucket
<< ":" << o
.key
1099 << " " << cpp_strerror(r
) << " "
1100 << oc
.wq
->thr_name() << dendl
;
1103 ldpp_dout(oc
.dpp
, 2) << "DELETED: current is-dm "
1104 << oc
.bucket
<< ":" << o
.key
1105 << " " << oc
.wq
->thr_name() << dendl
;
1107 /* ! o.is_delete_marker() */
1108 r
= remove_expired_obj(oc
.dpp
, oc
, !oc
.bucket
->versioned(),
1109 rgw::notify::ObjectExpirationCurrent
);
1111 ldpp_dout(oc
.dpp
, 0) << "ERROR: remove_expired_obj "
1112 << oc
.bucket
<< ":" << o
.key
1113 << " " << cpp_strerror(r
) << " "
1114 << oc
.wq
->thr_name() << dendl
;
1118 perfcounter
->inc(l_rgw_lc_expire_current
, 1);
1120 ldpp_dout(oc
.dpp
, 2) << "DELETED:" << oc
.bucket
<< ":" << o
.key
1121 << " " << oc
.wq
->thr_name() << dendl
;
1127 class LCOpAction_NonCurrentExpiration
: public LCOpAction
{
1130 LCOpAction_NonCurrentExpiration(op_env
& env
)
1133 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1135 if (o
.is_current()) {
1136 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1137 << ": current version, skipping "
1138 << oc
.wq
->thr_name() << dendl
;
1142 int expiration
= oc
.op
.noncur_expiration
;
1143 bool is_expired
= obj_has_expired(dpp
, oc
.cct
, oc
.effective_mtime
, expiration
,
1146 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired="
1147 << is_expired
<< " "
1148 << oc
.wq
->thr_name() << dendl
;
1150 return is_expired
&&
1151 pass_object_lock_check(oc
.driver
, oc
.obj
.get(), dpp
);
1154 int process(lc_op_ctx
& oc
) {
1156 int r
= remove_expired_obj(oc
.dpp
, oc
, true,
1157 rgw::notify::ObjectExpirationNoncurrent
);
1159 ldpp_dout(oc
.dpp
, 0) << "ERROR: remove_expired_obj (non-current expiration) "
1160 << oc
.bucket
<< ":" << o
.key
1161 << " " << cpp_strerror(r
)
1162 << " " << oc
.wq
->thr_name() << dendl
;
1166 perfcounter
->inc(l_rgw_lc_expire_noncurrent
, 1);
1168 ldpp_dout(oc
.dpp
, 2) << "DELETED:" << oc
.bucket
<< ":" << o
.key
1169 << " (non-current expiration) "
1170 << oc
.wq
->thr_name() << dendl
;
1175 class LCOpAction_DMExpiration
: public LCOpAction
{
1177 LCOpAction_DMExpiration(op_env
& env
) {}
1179 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1181 if (!o
.is_delete_marker()) {
1182 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1183 << ": not a delete marker, skipping "
1184 << oc
.wq
->thr_name() << dendl
;
1187 if (oc
.next_has_same_name(o
.key
.name
)) {
1188 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1189 << ": next is same object, skipping "
1190 << oc
.wq
->thr_name() << dendl
;
1194 *exp_time
= real_clock::now();
1199 int process(lc_op_ctx
& oc
) {
1201 int r
= remove_expired_obj(oc
.dpp
, oc
, true,
1202 rgw::notify::ObjectExpirationDeleteMarker
);
1204 ldpp_dout(oc
.dpp
, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
1205 << oc
.bucket
<< ":" << o
.key
1206 << " " << cpp_strerror(r
)
1207 << " " << oc
.wq
->thr_name()
1212 perfcounter
->inc(l_rgw_lc_expire_dm
, 1);
1214 ldpp_dout(oc
.dpp
, 2) << "DELETED:" << oc
.bucket
<< ":" << o
.key
1215 << " (delete marker expiration) "
1216 << oc
.wq
->thr_name() << dendl
;
1221 class LCOpAction_Transition
: public LCOpAction
{
1222 const transition_action
& transition
;
1223 bool need_to_process
{false};
1226 virtual bool check_current_state(bool is_current
) = 0;
1227 virtual ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) = 0;
1229 LCOpAction_Transition(const transition_action
& _transition
)
1230 : transition(_transition
) {}
1232 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1235 if (o
.is_delete_marker()) {
1239 if (!check_current_state(o
.is_current())) {
1243 auto mtime
= get_effective_mtime(oc
);
1245 if (transition
.days
< 0) {
1246 if (transition
.date
== boost::none
) {
1247 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1248 << ": no transition day/date set in rule, skipping "
1249 << oc
.wq
->thr_name() << dendl
;
1252 is_expired
= ceph_clock_now() >=
1253 ceph::real_clock::to_time_t(*transition
.date
);
1254 *exp_time
= *transition
.date
;
1256 is_expired
= obj_has_expired(dpp
, oc
.cct
, mtime
, transition
.days
, exp_time
);
1259 ldpp_dout(oc
.dpp
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired="
1260 << is_expired
<< " "
1261 << oc
.wq
->thr_name() << dendl
;
1264 (rgw_placement_rule::get_canonical_storage_class(o
.meta
.storage_class
) !=
1265 transition
.storage_class
);
1270 bool should_process() override
{
1271 return need_to_process
;
1274 int delete_tier_obj(lc_op_ctx
& oc
) {
1277 /* If bucket is versioned, create delete_marker for current version
1279 if (oc
.bucket
->versioned() && oc
.o
.is_current() && !oc
.o
.is_delete_marker()) {
1280 ret
= remove_expired_obj(oc
.dpp
, oc
, false, rgw::notify::ObjectExpiration
);
1281 ldpp_dout(oc
.dpp
, 20) << "delete_tier_obj Object(key:" << oc
.o
.key
<< ") current & not delete_marker" << " versioned_epoch: " << oc
.o
.versioned_epoch
<< "flags: " << oc
.o
.flags
<< dendl
;
1283 ret
= remove_expired_obj(oc
.dpp
, oc
, true, rgw::notify::ObjectExpiration
);
1284 ldpp_dout(oc
.dpp
, 20) << "delete_tier_obj Object(key:" << oc
.o
.key
<< ") not current " << "versioned_epoch: " << oc
.o
.versioned_epoch
<< "flags: " << oc
.o
.flags
<< dendl
;
1289 int transition_obj_to_cloud(lc_op_ctx
& oc
) {
1290 /* If CurrentVersion object, remove it & create delete marker */
1291 bool delete_object
= (!oc
.tier
->retain_head_object() ||
1292 (oc
.o
.is_current() && oc
.bucket
->versioned()));
1294 int ret
= oc
.obj
->transition_to_cloud(oc
.bucket
, oc
.tier
.get(), oc
.o
,
1295 oc
.env
.worker
->get_cloud_targets(), oc
.cct
,
1296 !delete_object
, oc
.dpp
, null_yield
);
1301 if (delete_object
) {
1302 ret
= delete_tier_obj(oc
);
1304 ldpp_dout(oc
.dpp
, 0) << "ERROR: Deleting tier object(" << oc
.o
.key
<< ") failed ret=" << ret
<< dendl
;
1312 int process(lc_op_ctx
& oc
) {
1316 if (oc
.o
.meta
.category
== RGWObjCategory::CloudTiered
) {
1317 /* Skip objects which are already cloud tiered. */
1318 ldpp_dout(oc
.dpp
, 30) << "Object(key:" << oc
.o
.key
<< ") is already cloud tiered to cloud-s3 tier: " << oc
.o
.meta
.storage_class
<< dendl
;
1322 std::string tier_type
= "";
1323 rgw::sal::ZoneGroup
& zonegroup
= oc
.driver
->get_zone()->get_zonegroup();
1325 rgw_placement_rule target_placement
;
1326 target_placement
.inherit_from(oc
.bucket
->get_placement_rule());
1327 target_placement
.storage_class
= transition
.storage_class
;
1329 r
= zonegroup
.get_placement_tier(target_placement
, &oc
.tier
);
1331 if (!r
&& oc
.tier
->get_tier_type() == "cloud-s3") {
1332 ldpp_dout(oc
.dpp
, 30) << "Found cloud s3 tier: " << target_placement
.storage_class
<< dendl
;
1333 if (!oc
.o
.is_current() &&
1334 !pass_object_lock_check(oc
.driver
, oc
.obj
.get(), oc
.dpp
)) {
1335 /* Skip objects which has object lock enabled. */
1336 ldpp_dout(oc
.dpp
, 10) << "Object(key:" << oc
.o
.key
<< ") is locked. Skipping transition to cloud-s3 tier: " << target_placement
.storage_class
<< dendl
;
1340 r
= transition_obj_to_cloud(oc
);
1342 ldpp_dout(oc
.dpp
, 0) << "ERROR: failed to transition obj(key:" << oc
.o
.key
<< ") to cloud (r=" << r
<< ")"
1347 if (!oc
.driver
->valid_placement(target_placement
)) {
1348 ldpp_dout(oc
.dpp
, 0) << "ERROR: non existent dest placement: "
1350 << " bucket="<< oc
.bucket
1351 << " rule_id=" << oc
.op
.id
1352 << " " << oc
.wq
->thr_name() << dendl
;
1356 int r
= oc
.obj
->transition(oc
.bucket
, target_placement
, o
.meta
.mtime
,
1357 o
.versioned_epoch
, oc
.dpp
, null_yield
);
1359 ldpp_dout(oc
.dpp
, 0) << "ERROR: failed to transition obj "
1360 << oc
.bucket
<< ":" << o
.key
1361 << " -> " << transition
.storage_class
1362 << " " << cpp_strerror(r
)
1363 << " " << oc
.wq
->thr_name() << dendl
;
1367 ldpp_dout(oc
.dpp
, 2) << "TRANSITIONED:" << oc
.bucket
1368 << ":" << o
.key
<< " -> "
1369 << transition
.storage_class
1370 << " " << oc
.wq
->thr_name() << dendl
;
1375 class LCOpAction_CurrentTransition
: public LCOpAction_Transition
{
1377 bool check_current_state(bool is_current
) override
{
1381 ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) override
{
1382 return oc
.o
.meta
.mtime
;
1385 LCOpAction_CurrentTransition(const transition_action
& _transition
)
1386 : LCOpAction_Transition(_transition
) {}
1387 int process(lc_op_ctx
& oc
) {
1388 int r
= LCOpAction_Transition::process(oc
);
1391 perfcounter
->inc(l_rgw_lc_transition_current
, 1);
1398 class LCOpAction_NonCurrentTransition
: public LCOpAction_Transition
{
1400 bool check_current_state(bool is_current
) override
{
1404 ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) override
{
1405 return oc
.effective_mtime
;
1408 LCOpAction_NonCurrentTransition(op_env
& env
,
1409 const transition_action
& _transition
)
1410 : LCOpAction_Transition(_transition
)
1412 int process(lc_op_ctx
& oc
) {
1413 int r
= LCOpAction_Transition::process(oc
);
1416 perfcounter
->inc(l_rgw_lc_transition_noncurrent
, 1);
1423 void LCOpRule::build()
1425 filters
.emplace_back(new LCOpFilter_Tags
);
1429 if (op
.expiration
> 0 ||
1430 op
.expiration_date
!= boost::none
) {
1431 actions
.emplace_back(new LCOpAction_CurrentExpiration(env
));
1434 if (op
.dm_expiration
) {
1435 actions
.emplace_back(new LCOpAction_DMExpiration(env
));
1438 if (op
.noncur_expiration
> 0) {
1439 actions
.emplace_back(new LCOpAction_NonCurrentExpiration(env
));
1442 for (auto& iter
: op
.transitions
) {
1443 actions
.emplace_back(new LCOpAction_CurrentTransition(iter
.second
));
1446 for (auto& iter
: op
.noncur_transitions
) {
1447 actions
.emplace_back(new LCOpAction_NonCurrentTransition(env
, iter
.second
));
1451 void LCOpRule::update()
1453 next_key_name
= env
.ol
.next_key_name();
1454 effective_mtime
= env
.ol
.get_prev_obj().meta
.mtime
;
1457 int LCOpRule::process(rgw_bucket_dir_entry
& o
,
1458 const DoutPrefixProvider
*dpp
,
1461 lc_op_ctx
ctx(env
, o
, next_key_name
, effective_mtime
, dpp
, wq
);
1462 shared_ptr
<LCOpAction
> *selected
= nullptr; // n.b., req'd by sharing
1465 for (auto& a
: actions
) {
1466 real_time action_exp
;
1468 if (a
->check(ctx
, &action_exp
, dpp
)) {
1469 if (action_exp
> exp
) {
1477 (*selected
)->should_process()) {
1480 * Calling filter checks after action checks because
1481 * all action checks (as they are implemented now) do
1482 * not access the objects themselves, but return result
1483 * from info from bucket index listing. The current tags filter
1484 * check does access the objects, so we avoid unnecessary rados calls
1485 * having filters check later in the process.
1489 for (auto& f
: filters
) {
1490 if (f
->check(dpp
, ctx
)) {
1497 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1498 << ": no rule match, skipping "
1499 << wq
->thr_name() << dendl
;
1503 int r
= (*selected
)->process(ctx
);
1505 ldpp_dout(dpp
, 0) << "ERROR: remove_expired_obj "
1506 << env
.bucket
<< ":" << o
.key
1507 << " " << cpp_strerror(r
)
1508 << " " << wq
->thr_name() << dendl
;
1511 ldpp_dout(dpp
, 20) << "processed:" << env
.bucket
<< ":"
1512 << o
.key
<< " " << wq
->thr_name() << dendl
;
1519 int RGWLC::bucket_lc_process(string
& shard_id
, LCWorker
* worker
,
1520 time_t stop_at
, bool once
)
1522 RGWLifecycleConfiguration
config(cct
);
1523 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
1524 string no_ns
, list_versions
;
1525 vector
<rgw_bucket_dir_entry
> objs
;
1526 vector
<std::string
> result
;
1527 boost::split(result
, shard_id
, boost::is_any_of(":"));
1528 string bucket_tenant
= result
[0];
1529 string bucket_name
= result
[1];
1530 string bucket_marker
= result
[2];
1532 ldpp_dout(this, 5) << "RGWLC::bucket_lc_process ENTER " << bucket_name
<< dendl
;
1533 if (unlikely(cct
->_conf
->rgwlc_skip_bucket_step
)) {
1537 int ret
= driver
->get_bucket(this, nullptr, bucket_tenant
, bucket_name
, &bucket
, null_yield
);
1539 ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
1540 << " failed" << dendl
;
1544 ret
= bucket
->load_bucket(this, null_yield
);
1546 ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name
1547 << " failed" << dendl
;
1551 auto stack_guard
= make_scope_guard(
1554 worker
->workpool
->drain();
1558 if (bucket
->get_marker() != bucket_marker
) {
1559 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
1560 << bucket_tenant
<< ":" << bucket_name
1561 << " cur_marker=" << bucket
->get_marker()
1562 << " orig_marker=" << bucket_marker
<< dendl
;
1566 map
<string
, bufferlist
>::iterator aiter
1567 = bucket
->get_attrs().find(RGW_ATTR_LC
);
1568 if (aiter
== bucket
->get_attrs().end()) {
1569 ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for "
1570 << bucket_name
<< " (terminates bucket_lc_process(...))"
1575 bufferlist::const_iterator iter
{&aiter
->second
};
1577 config
.decode(iter
);
1578 } catch (const buffer::error
& e
) {
1579 ldpp_dout(this, 0) << __func__
<< "() decode life cycle config failed"
1584 /* fetch information for zone checks */
1585 rgw::sal::Zone
* zone
= driver
->get_zone();
1587 auto pf
= [](RGWLC::LCWorker
* wk
, WorkQ
* wq
, WorkItem
& wi
) {
1589 boost::get
<std::tuple
<LCOpRule
, rgw_bucket_dir_entry
>>(wi
);
1590 auto& [op_rule
, o
] = wt
;
1592 ldpp_dout(wk
->get_lc(), 20)
1593 << __func__
<< "(): key=" << o
.key
<< wq
->thr_name()
1595 int ret
= op_rule
.process(o
, wk
->dpp
, wq
);
1597 ldpp_dout(wk
->get_lc(), 20)
1598 << "ERROR: orule.process() returned ret=" << ret
1599 << "thread:" << wq
->thr_name()
1603 worker
->workpool
->setf(pf
);
1605 multimap
<string
, lc_op
>& prefix_map
= config
.get_prefix_map();
1606 ldpp_dout(this, 10) << __func__
<< "() prefix_map size="
1607 << prefix_map
.size()
1610 rgw_obj_key pre_marker
;
1611 rgw_obj_key next_marker
;
1612 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end();
1615 if (worker_should_stop(stop_at
, once
)) {
1616 ldpp_dout(this, 5) << __func__
<< " interval budget EXPIRED worker "
1622 auto& op
= prefix_iter
->second
;
1623 if (!is_valid_op(op
)) {
1626 ldpp_dout(this, 20) << __func__
<< "(): prefix=" << prefix_iter
->first
1628 if (prefix_iter
!= prefix_map
.begin() &&
1629 (prefix_iter
->first
.compare(0, prev(prefix_iter
)->first
.length(),
1630 prev(prefix_iter
)->first
) == 0)) {
1631 next_marker
= pre_marker
;
1633 pre_marker
= next_marker
;
1636 LCObjsLister
ol(driver
, bucket
.get());
1637 ol
.set_prefix(prefix_iter
->first
);
1639 if (! zone_check(op
, zone
)) {
1640 ldpp_dout(this, 7) << "LC rule not executable in " << zone
->get_tier_type()
1641 << " zone, skipping" << dendl
;
1645 ret
= ol
.init(this);
1647 if (ret
== (-ENOENT
))
1649 ldpp_dout(this, 0) << "ERROR: driver->list_objects():" << dendl
;
1653 op_env
oenv(op
, driver
, worker
, bucket
.get(), ol
);
1654 LCOpRule
orule(oenv
);
1655 orule
.build(); // why can't ctor do it?
1656 rgw_bucket_dir_entry
* o
{nullptr};
1657 for (auto offset
= 0; ol
.get_obj(this, &o
/* , fetch_barrier */); ++offset
, ol
.next()) {
1659 std::tuple
<LCOpRule
, rgw_bucket_dir_entry
> t1
= {orule
, *o
};
1660 worker
->workpool
->enqueue(WorkItem
{t1
});
1661 if ((offset
% 100) == 0) {
1662 if (worker_should_stop(stop_at
, once
)) {
1663 ldpp_dout(this, 5) << __func__
<< " interval budget EXPIRED worker "
1670 worker
->workpool
->drain();
1673 ret
= handle_multipart_expiration(bucket
.get(), prefix_map
, worker
, stop_at
, once
);
1679 const int max_retries
;
1680 std::chrono::milliseconds sleep_ms
;
1683 SimpleBackoff(int max_retries
, std::chrono::milliseconds initial_sleep_ms
)
1684 : max_retries(max_retries
), sleep_ms(initial_sleep_ms
)
1686 SimpleBackoff(const SimpleBackoff
&) = delete;
1687 SimpleBackoff
& operator=(const SimpleBackoff
&) = delete;
1689 int get_retries() const {
1697 bool wait_backoff(const fu2::unique_function
<bool(void) const>& barrier
) {
1699 while (retries
< max_retries
) {
1704 std::this_thread::sleep_for(sleep_ms
* 2 * retries
++);
1710 int RGWLC::bucket_lc_post(int index
, int max_lock_sec
,
1711 rgw::sal::Lifecycle::LCEntry
& entry
, int& result
,
1714 utime_t
lock_duration(cct
->_conf
->rgw_lc_lock_max_time
, 0);
1716 std::unique_ptr
<rgw::sal::LCSerializer
> lock
=
1717 sal_lc
->get_serializer(lc_index_lock_name
, obj_names
[index
], cookie
);
1719 ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry
1720 << " index: " << index
<< " worker ix: " << worker
->ix
1724 int ret
= lock
->try_lock(this, lock_duration
, null_yield
);
1725 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
1726 /* already locked by another lc processor */
1727 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1728 << obj_names
[index
] << ", sleep 5, try again " << dendl
;
1735 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names
[index
]
1738 if (result
== -ENOENT
) {
1739 /* XXXX are we SURE the only way result could == ENOENT is when
1740 * there is no such bucket? It is currently the value returned
1741 * from bucket_lc_process(...) */
1742 ret
= sal_lc
->rm_entry(obj_names
[index
], entry
);
1744 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1745 << obj_names
[index
] << dendl
;
1748 } else if (result
< 0) {
1749 entry
.set_status(lc_failed
);
1751 entry
.set_status(lc_complete
);
1754 ret
= sal_lc
->set_entry(obj_names
[index
], entry
);
1756 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1757 << obj_names
[index
] << dendl
;
1761 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
1762 << obj_names
[index
] << dendl
;
1765 } /* RGWLC::bucket_lc_post */
1767 int RGWLC::list_lc_progress(string
& marker
, uint32_t max_entries
,
1768 vector
<std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
>>& progress_map
,
1771 progress_map
.clear();
1772 for(; index
< max_objs
; index
++, marker
="") {
1773 vector
<std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
>> entries
;
1774 int ret
= sal_lc
->list_entries(obj_names
[index
], marker
, max_entries
, entries
);
1776 if (ret
== -ENOENT
) {
1777 ldpp_dout(this, 10) << __func__
<< "() ignoring unfound lc object="
1778 << obj_names
[index
] << dendl
;
1784 progress_map
.reserve(progress_map
.size() + entries
.size());
1785 std::move(begin(entries
), end(entries
), std::back_inserter(progress_map
));
1786 //progress_map.insert(progress_map.end(), entries.begin(), entries.end());
1788 /* update index, marker tuple */
1789 if (progress_map
.size() > 0)
1790 marker
= progress_map
.back()->get_bucket();
1792 if (progress_map
.size() >= max_entries
)
1798 static inline vector
<int> random_sequence(uint32_t n
)
1800 vector
<int> v(n
, 0);
1801 std::generate(v
.begin(), v
.end(),
1802 [ix
= 0]() mutable {
1805 std::random_device rd
;
1806 std::default_random_engine rng
{rd()};
1807 std::shuffle(v
.begin(), v
.end(), rng
);
1811 static inline int get_lc_index(CephContext
*cct
,
1812 const std::string
& shard_id
)
1815 (cct
->_conf
->rgw_lc_max_objs
> HASH_PRIME
? HASH_PRIME
:
1816 cct
->_conf
->rgw_lc_max_objs
);
1817 /* n.b. review hash algo */
1818 int index
= ceph_str_hash_linux(shard_id
.c_str(),
1819 shard_id
.size()) % HASH_PRIME
% max_objs
;
1823 static inline void get_lc_oid(CephContext
*cct
,
1824 const std::string
& shard_id
, string
*oid
)
1826 /* n.b. review hash algo */
1827 int index
= get_lc_index(cct
, shard_id
);
1828 *oid
= lc_oid_prefix
;
1830 snprintf(buf
, 32, ".%d", index
);
1835 static std::string
get_bucket_lc_key(const rgw_bucket
& bucket
){
1836 return string_join_reserve(':', bucket
.tenant
, bucket
.name
, bucket
.marker
);
1839 int RGWLC::process(LCWorker
* worker
,
1840 const std::unique_ptr
<rgw::sal::Bucket
>& optional_bucket
,
1844 int max_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
1846 if (optional_bucket
) {
1847 /* if a bucket is provided, this is a single-bucket run, and
1848 * can be processed without traversing any state entries (we
1849 * do need the entry {pro,epi}logue which update the state entry
1850 * for this bucket) */
1851 auto bucket_lc_key
= get_bucket_lc_key(optional_bucket
->get_key());
1852 auto index
= get_lc_index(driver
->ctx(), bucket_lc_key
);
1853 ret
= process_bucket(index
, max_secs
, worker
, bucket_lc_key
, once
);
1856 /* generate an index-shard sequence unrelated to any other
1857 * that might be running in parallel */
1858 std::string all_buckets
{""};
1859 vector
<int> shard_seq
= random_sequence(max_objs
);
1860 for (auto index
: shard_seq
) {
1861 ret
= process(index
, max_secs
, worker
, once
);
1870 bool RGWLC::expired_session(time_t started
)
1872 if (! cct
->_conf
->rgwlc_auto_session_clear
) {
1876 time_t interval
= (cct
->_conf
->rgw_lc_debug_interval
> 0)
1877 ? cct
->_conf
->rgw_lc_debug_interval
1880 auto now
= time(nullptr);
1882 ldpp_dout(this, 16) << "RGWLC::expired_session"
1883 << " started: " << started
1884 << " interval: " << interval
<< "(*2==" << 2*interval
<< ")"
1888 return (started
+ 2*interval
< now
);
1891 time_t RGWLC::thread_stop_at()
1893 uint64_t interval
= (cct
->_conf
->rgw_lc_debug_interval
> 0)
1894 ? cct
->_conf
->rgw_lc_debug_interval
1897 return time(nullptr) + interval
;
1900 int RGWLC::process_bucket(int index
, int max_lock_secs
, LCWorker
* worker
,
1901 const std::string
& bucket_entry_marker
,
1904 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
1905 << "index: " << index
<< " worker ix: " << worker
->ix
1909 std::unique_ptr
<rgw::sal::LCSerializer
> serializer
=
1910 sal_lc
->get_serializer(lc_index_lock_name
, obj_names
[index
],
1911 worker
->thr_name());
1912 std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
> entry
;
1913 if (max_lock_secs
<= 0) {
1917 utime_t
time(max_lock_secs
, 0);
1918 ret
= serializer
->try_lock(this, time
, null_yield
);
1919 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
1920 /* already locked by another lc processor */
1921 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
1922 << obj_names
[index
] << dendl
;
1928 std::unique_lock
<rgw::sal::LCSerializer
> lock(
1929 *(serializer
.get()), std::adopt_lock
);
1931 ret
= sal_lc
->get_entry(obj_names
[index
], bucket_entry_marker
, &entry
);
1933 if (entry
->get_status() == lc_processing
) {
1934 if (expired_session(entry
->get_start_time())) {
1935 ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
1936 << " index: " << index
<< " worker ix: " << worker
->ix
1940 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
1942 << " index: " << index
1943 << " worker ix: " << worker
->ix
1950 /* do nothing if no bucket */
1951 if (entry
->get_bucket().empty()) {
1955 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
1956 << " index: " << index
<< " worker ix: " << worker
->ix
1959 entry
->set_status(lc_processing
);
1960 ret
= sal_lc
->set_entry(obj_names
[index
], *entry
);
1962 ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
1963 << obj_names
[index
] << entry
->get_bucket() << entry
->get_status()
1968 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
1969 << " index: " << index
<< " worker ix: " << worker
->ix
1973 ret
= bucket_lc_process(entry
->get_bucket(), worker
, thread_stop_at(), once
);
1974 bucket_lc_post(index
, max_lock_secs
, *entry
, ret
, worker
);
1977 } /* RGWLC::process_bucket */
1979 static inline bool allow_shard_rollover(CephContext
* cct
, time_t now
, time_t shard_rollover_date
)
1982 * - non-debug scheduling is in effect, and
1983 * - the current shard has not rolled over in the last 24 hours
1985 if (((shard_rollover_date
< now
) &&
1986 (now
- shard_rollover_date
> 24*60*60)) ||
1987 (! shard_rollover_date
/* no rollover date stored */) ||
1988 (cct
->_conf
->rgw_lc_debug_interval
> 0 /* defaults to -1 == disabled */)) {
1992 } /* allow_shard_rollover */
1994 static inline bool already_run_today(CephContext
* cct
, time_t start_date
)
1997 time_t begin_of_day
;
1998 utime_t now
= ceph_clock_now();
1999 localtime_r(&start_date
, &bdt
);
2001 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
2002 if (now
- start_date
< cct
->_conf
->rgw_lc_debug_interval
)
2011 begin_of_day
= mktime(&bdt
);
2012 if (now
- begin_of_day
< 24*60*60)
2016 } /* already_run_today */
2018 inline int RGWLC::advance_head(const std::string
& lc_shard
,
2019 rgw::sal::Lifecycle::LCHead
& head
,
2020 rgw::sal::Lifecycle::LCEntry
& entry
,
2024 std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
> next_entry
;
2026 ret
= sal_lc
->get_next_entry(lc_shard
, entry
.get_bucket(), &next_entry
);
2028 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
2029 << lc_shard
<< dendl
;
2033 /* save the next position */
2034 head
.set_marker(next_entry
->get_bucket());
2035 head
.set_start_date(start_date
);
2037 ret
= sal_lc
->put_head(lc_shard
, head
);
2039 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2046 } /* advance head */
2048 int RGWLC::process(int index
, int max_lock_secs
, LCWorker
* worker
,
2052 const auto& lc_shard
= obj_names
[index
];
2054 std::unique_ptr
<rgw::sal::Lifecycle::LCHead
> head
;
2055 std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
> entry
; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
2057 ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
2058 << "index: " << index
<< " worker ix: " << worker
->ix
2061 std::unique_ptr
<rgw::sal::LCSerializer
> lock
=
2062 sal_lc
->get_serializer(lc_index_lock_name
, lc_shard
, worker
->thr_name());
2064 utime_t
lock_for_s(max_lock_secs
, 0);
2065 const auto& lock_lambda
= [&]() {
2066 ret
= lock
->try_lock(this, lock_for_s
, null_yield
);
2070 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
2071 /* already locked by another lc processor */
2077 SimpleBackoff
shard_lock(5 /* max retries */, 50ms
);
2078 if (! shard_lock
.wait_backoff(lock_lambda
)) {
2079 ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
2080 << lc_shard
<< " after " << shard_lock
.get_retries()
2086 utime_t now
= ceph_clock_now();
2088 /* preamble: find an inital bucket/marker */
2089 ret
= sal_lc
->get_head(lc_shard
, &head
);
2091 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
2092 << lc_shard
<< ", ret=" << ret
<< dendl
;
2096 /* if there is nothing at head, try to reinitialize head.marker with the
2097 * first entry in the queue */
2098 if (head
->get_marker().empty() &&
2099 allow_shard_rollover(cct
, now
, head
->get_shard_rollover_date()) /* prevent multiple passes by diff.
2100 * rgws,in same cycle */) {
2102 ldpp_dout(this, 5) << "RGWLC::process() process shard rollover lc_shard=" << lc_shard
2103 << " head.marker=" << head
->get_marker()
2104 << " head.shard_rollover_date=" << head
->get_shard_rollover_date()
2107 vector
<std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
>> entries
;
2108 int ret
= sal_lc
->list_entries(lc_shard
, head
->get_marker(), 1, entries
);
2110 ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, "
2111 << "entries) returned error ret==" << ret
<< dendl
;
2114 if (entries
.size() > 0) {
2115 entry
= std::move(entries
.front());
2116 head
->set_marker(entry
->get_bucket());
2117 head
->set_start_date(now
);
2118 head
->set_shard_rollover_date(0);
2121 ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard=="
2122 << lc_shard
<< " head last stored at "
2123 << rgw_to_asctime(utime_t(time_t(head
->get_start_date()), 0))
2126 /* fetches the entry pointed to by head.bucket */
2127 ret
= sal_lc
->get_entry(lc_shard
, head
->get_marker(), &entry
);
2129 ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
2130 << "returned error ret==" << ret
<< dendl
;
2135 if (entry
&& !entry
->get_bucket().empty()) {
2136 if (entry
->get_status() == lc_processing
) {
2137 if (expired_session(entry
->get_start_time())) {
2139 << "RGWLC::process(): STALE lc session found for: " << entry
2140 << " index: " << index
<< " worker ix: " << worker
->ix
2141 << " (clearing)" << dendl
;
2144 << "RGWLC::process(): ACTIVE entry: " << entry
2145 << " index: " << index
<< " worker ix: " << worker
->ix
<< dendl
;
2146 /* skip to next entry */
2147 if (advance_head(lc_shard
, *head
.get(), *entry
.get(), now
) < 0) {
2150 /* done with this shard */
2151 if (head
->get_marker().empty()) {
2152 ldpp_dout(this, 5) <<
2153 "RGWLC::process() cycle finished lc_shard="
2156 head
->set_shard_rollover_date(ceph_clock_now());
2157 ret
= sal_lc
->put_head(lc_shard
, *head
.get());
2159 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2168 if ((entry
->get_status() == lc_complete
) &&
2169 already_run_today(cct
, entry
->get_start_time())) {
2170 /* skip to next entry */
2171 if (advance_head(lc_shard
, *head
.get(), *entry
.get(), now
) < 0) {
2174 ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker
->ix
2175 << " SKIP processing for already-processed bucket " << entry
->get_bucket()
2177 /* done with this shard */
2178 if (head
->get_marker().empty()) {
2179 ldpp_dout(this, 5) <<
2180 "RGWLC::process() cycle finished lc_shard="
2183 head
->set_shard_rollover_date(ceph_clock_now());
2184 ret
= sal_lc
->put_head(lc_shard
, *head
.get());
2186 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2196 ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1"
2197 << " (this is possible mainly before any lc policy has been stored"
2198 << " or after removal of an lc_shard object)"
2203 /* When there are no more entries to process, entry will be
2204 * equivalent to an empty marker and so the following resets the
2205 * processing for the shard automatically when processing is
2206 * finished for the shard */
2207 ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
2208 << " index: " << index
<< " worker ix: " << worker
->ix
2211 entry
->set_status(lc_processing
);
2212 entry
->set_start_time(now
);
2214 ret
= sal_lc
->set_entry(lc_shard
, *entry
);
2216 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
2217 << lc_shard
<< entry
->get_bucket() << entry
->get_status() << dendl
;
2221 /* advance head for next waiter, then process */
2222 if (advance_head(lc_shard
, *head
.get(), *entry
.get(), now
) < 0) {
2226 ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
2227 << " index: " << index
<< " worker ix: " << worker
->ix
2230 /* drop lock so other instances can make progress while this
2231 * bucket is being processed */
2233 ret
= bucket_lc_process(entry
->get_bucket(), worker
, thread_stop_at(), once
);
2236 //bucket_lc_post(index, max_lock_secs, entry, ret, worker);
2237 if (! shard_lock
.wait_backoff(lock_lambda
)) {
2238 ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
2239 << lc_shard
<< " after " << shard_lock
.get_retries()
2244 if (ret
== -ENOENT
) {
2245 /* XXXX are we SURE the only way result could == ENOENT is when
2246 * there is no such bucket? It is currently the value returned
2247 * from bucket_lc_process(...) */
2248 ret
= sal_lc
->rm_entry(lc_shard
, *entry
);
2250 ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry "
2251 << lc_shard
<< " (nonfatal)"
2253 /* not fatal, could result from a race */
2257 entry
->set_status(lc_failed
);
2259 entry
->set_status(lc_complete
);
2261 ret
= sal_lc
->set_entry(lc_shard
, *entry
);
2263 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
2271 /* done with this shard */
2272 if (head
->get_marker().empty()) {
2273 ldpp_dout(this, 5) <<
2274 "RGWLC::process() cycle finished lc_shard="
2277 head
->set_shard_rollover_date(ceph_clock_now());
2278 ret
= sal_lc
->put_head(lc_shard
, *head
.get());
2280 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2286 } while(1 && !once
&& !going_down());
2293 void RGWLC::start_processor()
2295 auto maxw
= cct
->_conf
->rgw_lc_max_worker
;
2296 workers
.reserve(maxw
);
2297 for (int ix
= 0; ix
< maxw
; ++ix
) {
2299 std::make_unique
<RGWLC::LCWorker
>(this /* dpp */, cct
, this, ix
);
2300 worker
->create((string
{"lifecycle_thr_"} + to_string(ix
)).c_str());
2301 workers
.emplace_back(std::move(worker
));
2305 void RGWLC::stop_processor()
2308 for (auto& worker
: workers
) {
2315 unsigned RGWLC::get_subsys() const
2320 std::ostream
& RGWLC::gen_prefix(std::ostream
& out
) const
2322 return out
<< "lifecycle: ";
2325 void RGWLC::LCWorker::stop()
2327 std::lock_guard l
{lock
};
2331 bool RGWLC::going_down()
2336 bool RGWLC::LCWorker::should_work(utime_t
& now
)
2342 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
2343 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
,
2344 &end_hour
, &end_minute
);
2346 time_t tt
= now
.sec();
2347 localtime_r(&tt
, &bdt
);
2349 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
2350 /* We're debugging, so say we can run */
2352 } else if ((bdt
.tm_hour
*60 + bdt
.tm_min
>= start_hour
*60 + start_minute
) &&
2353 (bdt
.tm_hour
*60 + bdt
.tm_min
<= end_hour
*60 + end_minute
)) {
2361 int RGWLC::LCWorker::schedule_next_start_time(utime_t
&start
, utime_t
& now
)
2365 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
2366 secs
= start
+ cct
->_conf
->rgw_lc_debug_interval
- now
;
2376 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
2377 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
,
2380 time_t tt
= now
.sec();
2382 localtime_r(&tt
, &bdt
);
2383 bdt
.tm_hour
= start_hour
;
2384 bdt
.tm_min
= start_minute
;
2389 return secs
>0 ? secs
: secs
+24*60*60;
2392 RGWLC::LCWorker::~LCWorker()
2397 void RGWLifecycleConfiguration::generate_test_instances(
2398 list
<RGWLifecycleConfiguration
*>& o
)
2400 o
.push_back(new RGWLifecycleConfiguration
);
2403 template<typename F
>
2404 static int guard_lc_modify(const DoutPrefixProvider
*dpp
,
2405 rgw::sal::Driver
* driver
,
2406 rgw::sal::Lifecycle
* sal_lc
,
2407 const rgw_bucket
& bucket
, const string
& cookie
,
2409 CephContext
*cct
= driver
->ctx();
2411 auto bucket_lc_key
= get_bucket_lc_key(bucket
);
2413 get_lc_oid(cct
, bucket_lc_key
, &oid
);
2415 /* XXX it makes sense to take shard_id for a bucket_id? */
2416 std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
> entry
= sal_lc
->get_entry();
2417 entry
->set_bucket(bucket_lc_key
);
2418 entry
->set_status(lc_uninitial
);
2419 int max_lock_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
2421 std::unique_ptr
<rgw::sal::LCSerializer
> lock
=
2422 sal_lc
->get_serializer(lc_index_lock_name
, oid
, cookie
);
2423 utime_t
time(max_lock_secs
, 0);
2426 uint16_t retries
{0};
2428 // due to reports of starvation trying to save lifecycle policy, try hard
2430 ret
= lock
->try_lock(dpp
, time
, null_yield
);
2431 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
2432 ldpp_dout(dpp
, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2433 << oid
<< ", retry in 100ms, ret=" << ret
<< dendl
;
2434 std::this_thread::sleep_for(std::chrono::milliseconds(100));
2435 // the typical S3 client will time out in 60s
2436 if(retries
++ < 500) {
2441 ldpp_dout(dpp
, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2442 << oid
<< ", ret=" << ret
<< dendl
;
2445 ret
= f(sal_lc
, oid
, *entry
.get());
2447 ldpp_dout(dpp
, 0) << "RGWLC::RGWPutLC() failed to set entry on "
2448 << oid
<< ", ret=" << ret
<< dendl
;
2456 int RGWLC::set_bucket_config(rgw::sal::Bucket
* bucket
,
2457 const rgw::sal::Attrs
& bucket_attrs
,
2458 RGWLifecycleConfiguration
*config
)
2461 rgw::sal::Attrs attrs
= bucket_attrs
;
2463 /* if no RGWLifecycleconfiguration provided, it means
2464 * RGW_ATTR_LC is already valid and present */
2466 config
->encode(lc_bl
);
2467 attrs
[RGW_ATTR_LC
] = std::move(lc_bl
);
2470 bucket
->merge_and_store_attrs(this, attrs
, null_yield
);
2476 rgw_bucket
& b
= bucket
->get_key();
2479 ret
= guard_lc_modify(this, driver
, sal_lc
.get(), b
, cookie
,
2480 [&](rgw::sal::Lifecycle
* sal_lc
, const string
& oid
,
2481 rgw::sal::Lifecycle::LCEntry
& entry
) {
2482 return sal_lc
->set_entry(oid
, entry
);
2488 int RGWLC::remove_bucket_config(rgw::sal::Bucket
* bucket
,
2489 const rgw::sal::Attrs
& bucket_attrs
,
2492 rgw::sal::Attrs attrs
= bucket_attrs
;
2493 rgw_bucket
& b
= bucket
->get_key();
2497 attrs
.erase(RGW_ATTR_LC
);
2498 ret
= bucket
->merge_and_store_attrs(this, attrs
, null_yield
);
2501 ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
2502 << b
.name
<< " returned err=" << ret
<< dendl
;
2507 ret
= guard_lc_modify(this, driver
, sal_lc
.get(), b
, cookie
,
2508 [&](rgw::sal::Lifecycle
* sal_lc
, const string
& oid
,
2509 rgw::sal::Lifecycle::LCEntry
& entry
) {
2510 return sal_lc
->rm_entry(oid
, entry
);
2514 } /* RGWLC::remove_bucket_config */
2524 int fix_lc_shard_entry(const DoutPrefixProvider
*dpp
,
2525 rgw::sal::Driver
* driver
,
2526 rgw::sal::Lifecycle
* sal_lc
,
2527 rgw::sal::Bucket
* bucket
)
2529 if (auto aiter
= bucket
->get_attrs().find(RGW_ATTR_LC
);
2530 aiter
== bucket
->get_attrs().end()) {
2531 return 0; // No entry, nothing to fix
2534 auto bucket_lc_key
= get_bucket_lc_key(bucket
->get_key());
2536 get_lc_oid(driver
->ctx(), bucket_lc_key
, &lc_oid
);
2538 std::unique_ptr
<rgw::sal::Lifecycle::LCEntry
> entry
;
2539 // There are multiple cases we need to encounter here
2540 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
2541 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
2542 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
2543 // We are not dropping the old marker here as that would be caught by the next LC process update
2544 int ret
= sal_lc
->get_entry(lc_oid
, bucket_lc_key
, &entry
);
2546 ldpp_dout(dpp
, 5) << "Entry already exists, nothing to do" << dendl
;
2547 return ret
; // entry is already existing correctly set to marker
2549 ldpp_dout(dpp
, 5) << "lc_get_entry errored ret code=" << ret
<< dendl
;
2550 if (ret
== -ENOENT
) {
2551 ldpp_dout(dpp
, 1) << "No entry for bucket=" << bucket
2552 << " creating " << dendl
;
2553 // TODO: we have too many ppl making cookies like this!
2554 char cookie_buf
[COOKIE_LEN
+ 1];
2555 gen_rand_alphanumeric(driver
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
2556 std::string cookie
= cookie_buf
;
2558 ret
= guard_lc_modify(dpp
,
2559 driver
, sal_lc
, bucket
->get_key(), cookie
,
2560 [&lc_oid
](rgw::sal::Lifecycle
* slc
,
2562 rgw::sal::Lifecycle::LCEntry
& entry
) {
2563 return slc
->set_entry(lc_oid
, entry
);
2571 std::string
s3_expiration_header(
2572 DoutPrefixProvider
* dpp
,
2573 const rgw_obj_key
& obj_key
,
2574 const RGWObjTags
& obj_tagset
,
2575 const ceph::real_time
& mtime
,
2576 const std::map
<std::string
, buffer::list
>& bucket_attrs
)
2578 CephContext
* cct
= dpp
->get_cct();
2579 RGWLifecycleConfiguration
config(cct
);
2580 std::string hdr
{""};
2582 const auto& aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
2583 if (aiter
== bucket_attrs
.end())
2586 bufferlist::const_iterator iter
{&aiter
->second
};
2588 config
.decode(iter
);
2589 } catch (const buffer::error
& e
) {
2590 ldpp_dout(dpp
, 0) << __func__
2591 << "() decode life cycle config failed"
2596 /* dump tags at debug level 16 */
2597 RGWObjTags::tag_map_t obj_tag_map
= obj_tagset
.get_tags();
2598 if (cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 16)) {
2599 for (const auto& elt
: obj_tag_map
) {
2600 ldpp_dout(dpp
, 16) << __func__
2601 << "() key=" << elt
.first
<< " val=" << elt
.second
2606 boost::optional
<ceph::real_time
> expiration_date
;
2607 boost::optional
<std::string
> rule_id
;
2609 const auto& rule_map
= config
.get_rule_map();
2610 for (const auto& ri
: rule_map
) {
2611 const auto& rule
= ri
.second
;
2612 auto& id
= rule
.get_id();
2613 auto& filter
= rule
.get_filter();
2614 auto& prefix
= filter
.has_prefix() ? filter
.get_prefix(): rule
.get_prefix();
2615 auto& expiration
= rule
.get_expiration();
2616 auto& noncur_expiration
= rule
.get_noncur_expiration();
2618 ldpp_dout(dpp
, 10) << "rule: " << ri
.first
2619 << " prefix: " << prefix
2621 << " date: " << expiration
.get_date()
2622 << " days: " << expiration
.get_days()
2623 << " noncur_expiration: "
2624 << " date: " << noncur_expiration
.get_date()
2625 << " days: " << noncur_expiration
.get_days()
2628 /* skip if rule !enabled
2629 * if rule has prefix, skip iff object !match prefix
2630 * if rule has tags, skip iff object !match tags
2631 * note if object is current or non-current, compare accordingly
2632 * if rule has days, construct date expression and save iff older
2634 * if rule has date, convert date expression and save iff older
2636 * if the date accum has a value, format it into hdr
2639 if (! rule
.is_enabled())
2642 if(! prefix
.empty()) {
2643 if (! boost::starts_with(obj_key
.name
, prefix
))
2647 if (filter
.has_tags()) {
2648 bool tag_match
= false;
2649 const RGWObjTags
& rule_tagset
= filter
.get_tags();
2650 for (auto& tag
: rule_tagset
.get_tags()) {
2651 /* remember, S3 tags are {key,value} tuples */
2653 auto obj_tag
= obj_tag_map
.find(tag
.first
);
2654 if (obj_tag
== obj_tag_map
.end() || obj_tag
->second
!= tag
.second
) {
2655 ldpp_dout(dpp
, 10) << "tag does not match obj_key=" << obj_key
2656 << " rule_id=" << id
2667 // compute a uniform expiration date
2668 boost::optional
<ceph::real_time
> rule_expiration_date
;
2669 const LCExpiration
& rule_expiration
=
2670 (obj_key
.instance
.empty()) ? expiration
: noncur_expiration
;
2672 if (rule_expiration
.has_date()) {
2673 rule_expiration_date
=
2674 boost::optional
<ceph::real_time
>(
2675 ceph::from_iso_8601(rule
.get_expiration().get_date()));
2677 if (rule_expiration
.has_days()) {
2678 rule_expiration_date
=
2679 boost::optional
<ceph::real_time
>(
2680 mtime
+ make_timespan(double(rule_expiration
.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime
)%(24*60*60) + 24*60*60));
2684 // update earliest expiration
2685 if (rule_expiration_date
) {
2686 if ((! expiration_date
) ||
2687 (*expiration_date
> *rule_expiration_date
)) {
2689 boost::optional
<ceph::real_time
>(rule_expiration_date
);
2690 rule_id
= boost::optional
<std::string
>(id
);
2695 // cond format header
2696 if (expiration_date
&& rule_id
) {
2697 // Fri, 23 Dec 2012 00:00:00 GMT
2699 time_t exp
= ceph::real_clock::to_time_t(*expiration_date
);
2700 if (std::strftime(exp_buf
, sizeof(exp_buf
),
2701 "%a, %d %b %Y %T %Z", std::gmtime(&exp
))) {
2702 hdr
= fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf
,
2705 ldpp_dout(dpp
, 0) << __func__
<<
2706 "() strftime of life cycle expiration header failed"
2713 } /* rgwlc_s3_expiration_header */
2715 bool s3_multipart_abort_header(
2716 DoutPrefixProvider
* dpp
,
2717 const rgw_obj_key
& obj_key
,
2718 const ceph::real_time
& mtime
,
2719 const std::map
<std::string
, buffer::list
>& bucket_attrs
,
2720 ceph::real_time
& abort_date
,
2721 std::string
& rule_id
)
2723 CephContext
* cct
= dpp
->get_cct();
2724 RGWLifecycleConfiguration
config(cct
);
2726 const auto& aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
2727 if (aiter
== bucket_attrs
.end())
2730 bufferlist::const_iterator iter
{&aiter
->second
};
2732 config
.decode(iter
);
2733 } catch (const buffer::error
& e
) {
2734 ldpp_dout(dpp
, 0) << __func__
2735 << "() decode life cycle config failed"
2740 std::optional
<ceph::real_time
> abort_date_tmp
;
2741 std::optional
<std::string_view
> rule_id_tmp
;
2742 const auto& rule_map
= config
.get_rule_map();
2743 for (const auto& ri
: rule_map
) {
2744 const auto& rule
= ri
.second
;
2745 const auto& id
= rule
.get_id();
2746 const auto& filter
= rule
.get_filter();
2747 const auto& prefix
= filter
.has_prefix()?filter
.get_prefix():rule
.get_prefix();
2748 const auto& mp_expiration
= rule
.get_mp_expiration();
2749 if (!rule
.is_enabled()) {
2752 if(!prefix
.empty() && !boost::starts_with(obj_key
.name
, prefix
)) {
2756 std::optional
<ceph::real_time
> rule_abort_date
;
2757 if (mp_expiration
.has_days()) {
2758 rule_abort_date
= std::optional
<ceph::real_time
>(
2759 mtime
+ make_timespan(mp_expiration
.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime
)%(24*60*60) + 24*60*60));
2762 // update earliest abort date
2763 if (rule_abort_date
) {
2764 if ((! abort_date_tmp
) ||
2765 (*abort_date_tmp
> *rule_abort_date
)) {
2767 std::optional
<ceph::real_time
>(rule_abort_date
);
2768 rule_id_tmp
= std::optional
<std::string_view
>(id
);
2772 if (abort_date_tmp
&& rule_id_tmp
) {
2773 abort_date
= *abort_date_tmp
;
2774 rule_id
= *rule_id_tmp
;
2781 } /* namespace rgw::lc */
2783 void lc_op::dump(Formatter
*f
) const
2785 f
->dump_bool("status", status
);
2786 f
->dump_bool("dm_expiration", dm_expiration
);
2788 f
->dump_int("expiration", expiration
);
2789 f
->dump_int("noncur_expiration", noncur_expiration
);
2790 f
->dump_int("mp_expiration", mp_expiration
);
2791 if (expiration_date
) {
2792 utime_t
ut(*expiration_date
);
2793 f
->dump_stream("expiration_date") << ut
;
2796 f
->dump_object("obj_tags", *obj_tags
);
2798 f
->open_object_section("transitions");
2799 for(auto& [storage_class
, transition
] : transitions
) {
2800 f
->dump_object(storage_class
, transition
);
2804 f
->open_object_section("noncur_transitions");
2805 for (auto& [storage_class
, transition
] : noncur_transitions
) {
2806 f
->dump_object(storage_class
, transition
);
2811 void LCFilter::dump(Formatter
*f
) const
2813 f
->dump_string("prefix", prefix
);
2814 f
->dump_object("obj_tags", obj_tags
);
2815 if (have_flag(LCFlagType::ArchiveZone
)) {
2816 f
->dump_string("archivezone", "");
2820 void LCExpiration::dump(Formatter
*f
) const
2822 f
->dump_string("days", days
);
2823 f
->dump_string("date", date
);
2826 void LCRule::dump(Formatter
*f
) const
2828 f
->dump_string("id", id
);
2829 f
->dump_string("prefix", prefix
);
2830 f
->dump_string("status", status
);
2831 f
->dump_object("expiration", expiration
);
2832 f
->dump_object("noncur_expiration", noncur_expiration
);
2833 f
->dump_object("mp_expiration", mp_expiration
);
2834 f
->dump_object("filter", filter
);
2835 f
->open_object_section("transitions");
2836 for (auto& [storage_class
, transition
] : transitions
) {
2837 f
->dump_object(storage_class
, transition
);
2841 f
->open_object_section("noncur_transitions");
2842 for (auto& [storage_class
, transition
] : noncur_transitions
) {
2843 f
->dump_object(storage_class
, transition
);
2846 f
->dump_bool("dm_expiration", dm_expiration
);
2850 void RGWLifecycleConfiguration::dump(Formatter
*f
) const
2852 f
->open_object_section("prefix_map");
2853 for (auto& prefix
: prefix_map
) {
2854 f
->dump_object(prefix
.first
.c_str(), prefix
.second
);
2858 f
->open_array_section("rule_map");
2859 for (auto& rule
: rule_map
) {
2860 f
->open_object_section("entry");
2861 f
->dump_string("id", rule
.first
);
2862 f
->open_object_section("rule");
2863 rule
.second
.dump(f
);