1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
11 #include <boost/algorithm/string/split.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <boost/variant.hpp>
16 #include "include/scope_guard.h"
17 #include "common/Formatter.h"
18 #include "common/containers.h"
19 #include <common/errno.h>
20 #include "include/random.h"
21 #include "cls/lock/cls_lock_client.h"
22 #include "rgw_perf_counters.h"
23 #include "rgw_common.h"
24 #include "rgw_bucket.h"
27 #include "rgw_string.h"
28 #include "rgw_multi.h"
29 #include "rgw_sal_rados.h"
30 #include "rgw_rados.h"
31 #include "rgw_lc_tier.h"
32 #include "rgw_notify.h"
34 // this seems safe to use, at least for now--arguably, we should
35 // prefer header-only fmt, in general
36 #undef FMT_HEADER_ONLY
37 #define FMT_HEADER_ONLY 1
38 #include "fmt/format.h"
40 #include "services/svc_sys_obj.h"
41 #include "services/svc_zone.h"
42 #include "services/svc_tier_rados.h"
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_rgw
49 const char* LC_STATUS
[] = {
56 using namespace librados
;
58 bool LCRule::valid() const
60 if (id
.length() > MAX_ID_LEN
) {
63 else if(expiration
.empty() && noncur_expiration
.empty() &&
64 mp_expiration
.empty() && !dm_expiration
&&
65 transitions
.empty() && noncur_transitions
.empty()) {
68 else if (!expiration
.valid() || !noncur_expiration
.valid() ||
69 !mp_expiration
.valid()) {
72 if (!transitions
.empty()) {
73 bool using_days
= expiration
.has_days();
74 bool using_date
= expiration
.has_date();
75 for (const auto& elem
: transitions
) {
76 if (!elem
.second
.valid()) {
79 using_days
= using_days
|| elem
.second
.has_days();
80 using_date
= using_date
|| elem
.second
.has_date();
81 if (using_days
&& using_date
) {
86 for (const auto& elem
: noncur_transitions
) {
87 if (!elem
.second
.valid()) {
95 void LCRule::init_simple_days_rule(std::string_view _id
,
96 std::string_view _prefix
, int num_days
)
101 snprintf(buf
, sizeof(buf
), "%d", num_days
);
102 expiration
.set_days(buf
);
106 void RGWLifecycleConfiguration::add_rule(const LCRule
& rule
)
108 auto& id
= rule
.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
109 rule_map
.insert(pair
<string
, LCRule
>(id
, rule
));
112 bool RGWLifecycleConfiguration::_add_rule(const LCRule
& rule
)
114 lc_op
op(rule
.get_id());
115 op
.status
= rule
.is_enabled();
116 if (rule
.get_expiration().has_days()) {
117 op
.expiration
= rule
.get_expiration().get_days();
119 if (rule
.get_expiration().has_date()) {
120 op
.expiration_date
= ceph::from_iso_8601(rule
.get_expiration().get_date());
122 if (rule
.get_noncur_expiration().has_days()) {
123 op
.noncur_expiration
= rule
.get_noncur_expiration().get_days();
125 if (rule
.get_mp_expiration().has_days()) {
126 op
.mp_expiration
= rule
.get_mp_expiration().get_days();
128 op
.dm_expiration
= rule
.get_dm_expiration();
129 for (const auto &elem
: rule
.get_transitions()) {
130 transition_action action
;
131 if (elem
.second
.has_days()) {
132 action
.days
= elem
.second
.get_days();
134 action
.date
= ceph::from_iso_8601(elem
.second
.get_date());
137 = rgw_placement_rule::get_canonical_storage_class(elem
.first
);
138 op
.transitions
.emplace(elem
.first
, std::move(action
));
140 for (const auto &elem
: rule
.get_noncur_transitions()) {
141 transition_action action
;
142 action
.days
= elem
.second
.get_days();
143 action
.date
= ceph::from_iso_8601(elem
.second
.get_date());
144 action
.storage_class
= elem
.first
;
145 op
.noncur_transitions
.emplace(elem
.first
, std::move(action
));
148 if (rule
.get_filter().has_prefix()){
149 prefix
= rule
.get_filter().get_prefix();
151 prefix
= rule
.get_prefix();
154 if (rule
.get_filter().has_tags()){
155 op
.obj_tags
= rule
.get_filter().get_tags();
157 prefix_map
.emplace(std::move(prefix
), std::move(op
));
161 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule
& rule
)
166 auto& id
= rule
.get_id();
167 if (rule_map
.find(id
) != rule_map
.end()) { //id shouldn't be the same
170 if (rule
.get_filter().has_tags() && (rule
.get_dm_expiration() ||
171 !rule
.get_mp_expiration().empty())) {
172 return -ERR_INVALID_REQUEST
;
174 rule_map
.insert(pair
<string
, LCRule
>(id
, rule
));
176 if (!_add_rule(rule
)) {
177 return -ERR_INVALID_REQUEST
;
182 bool RGWLifecycleConfiguration::has_same_action(const lc_op
& first
,
183 const lc_op
& second
) {
184 if ((first
.expiration
> 0 || first
.expiration_date
!= boost::none
) &&
185 (second
.expiration
> 0 || second
.expiration_date
!= boost::none
)) {
187 } else if (first
.noncur_expiration
> 0 && second
.noncur_expiration
> 0) {
189 } else if (first
.mp_expiration
> 0 && second
.mp_expiration
> 0) {
191 } else if (!first
.transitions
.empty() && !second
.transitions
.empty()) {
192 for (auto &elem
: first
.transitions
) {
193 if (second
.transitions
.find(elem
.first
) != second
.transitions
.end()) {
197 } else if (!first
.noncur_transitions
.empty() &&
198 !second
.noncur_transitions
.empty()) {
199 for (auto &elem
: first
.noncur_transitions
) {
200 if (second
.noncur_transitions
.find(elem
.first
) !=
201 second
.noncur_transitions
.end()) {
209 /* Formerly, this method checked for duplicate rules using an invalid
210 * method (prefix uniqueness). */
211 bool RGWLifecycleConfiguration::valid()
216 void *RGWLC::LCWorker::entry() {
218 std::unique_ptr
<rgw::sal::Bucket
> all_buckets
; // empty restriction
219 utime_t start
= ceph_clock_now();
220 if (should_work(start
)) {
221 ldpp_dout(dpp
, 2) << "life cycle: start" << dendl
;
222 int r
= lc
->process(this, all_buckets
, false /* once */);
224 ldpp_dout(dpp
, 0) << "ERROR: do life cycle process() returned error r="
227 ldpp_dout(dpp
, 2) << "life cycle: stop" << dendl
;
228 cloud_targets
.clear(); // clear cloud targets
230 if (lc
->going_down())
233 utime_t end
= ceph_clock_now();
234 int secs
= schedule_next_start_time(start
, end
);
236 next
.set_from_double(end
+ secs
);
238 ldpp_dout(dpp
, 5) << "schedule life cycle next start time: "
239 << rgw_to_asctime(next
) << dendl
;
241 std::unique_lock l
{lock
};
242 cond
.wait_for(l
, std::chrono::seconds(secs
));
243 } while (!lc
->going_down());
248 void RGWLC::initialize(CephContext
*_cct
, rgw::sal::Store
* _store
) {
251 sal_lc
= store
->get_lifecycle();
252 max_objs
= cct
->_conf
->rgw_lc_max_objs
;
253 if (max_objs
> HASH_PRIME
)
254 max_objs
= HASH_PRIME
;
256 obj_names
= new string
[max_objs
];
258 for (int i
= 0; i
< max_objs
; i
++) {
259 obj_names
[i
] = lc_oid_prefix
;
261 snprintf(buf
, 32, ".%d", i
);
262 obj_names
[i
].append(buf
);
265 #define COOKIE_LEN 16
266 char cookie_buf
[COOKIE_LEN
+ 1];
267 gen_rand_alphanumeric(cct
, cookie_buf
, sizeof(cookie_buf
) - 1);
271 void RGWLC::finalize()
276 bool RGWLC::if_already_run_today(time_t start_date
)
280 utime_t now
= ceph_clock_now();
281 localtime_r(&start_date
, &bdt
);
283 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
284 if (now
- start_date
< cct
->_conf
->rgw_lc_debug_interval
)
293 begin_of_day
= mktime(&bdt
);
294 if (now
- begin_of_day
< 24*60*60)
300 static inline std::ostream
& operator<<(std::ostream
&os
, rgw::sal::Lifecycle::LCEntry
& ent
) {
301 os
<< "<ent: bucket=";
303 os
<< "; start_time=";
304 os
<< rgw_to_asctime(utime_t(time_t(ent
.start_time
), 0));
311 int RGWLC::bucket_lc_prepare(int index
, LCWorker
* worker
)
313 vector
<rgw::sal::Lifecycle::LCEntry
> entries
;
316 ldpp_dout(this, 5) << "RGWLC::bucket_lc_prepare(): PREPARE "
317 << "index: " << index
<< " worker ix: " << worker
->ix
320 #define MAX_LC_LIST_ENTRIES 100
322 int ret
= sal_lc
->list_entries(obj_names
[index
], marker
, MAX_LC_LIST_ENTRIES
, entries
);
326 for (auto& entry
: entries
) {
327 entry
.start_time
= ceph_clock_now();
328 entry
.status
= lc_uninitial
; // lc_uninitial? really?
329 ret
= sal_lc
->set_entry(obj_names
[index
], entry
);
332 << "RGWLC::bucket_lc_prepare() failed to set entry on "
333 << obj_names
[index
] << dendl
;
338 if (! entries
.empty()) {
339 marker
= std::move(entries
.back().bucket
);
341 } while (!entries
.empty());
346 static bool obj_has_expired(const DoutPrefixProvider
*dpp
, CephContext
*cct
, ceph::real_time mtime
, int days
,
347 ceph::real_time
*expire_time
= nullptr)
349 double timediff
, cmp
;
351 if (cct
->_conf
->rgw_lc_debug_interval
<= 0) {
352 /* Normal case, run properly */
353 cmp
= double(days
)*24*60*60;
354 base_time
= ceph_clock_now().round_to_day();
356 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
357 cmp
= double(days
)*cct
->_conf
->rgw_lc_debug_interval
;
358 base_time
= ceph_clock_now();
360 auto tt_mtime
= ceph::real_clock::to_time_t(mtime
);
361 timediff
= base_time
- tt_mtime
;
364 *expire_time
= mtime
+ make_timespan(cmp
);
367 ldpp_dout(dpp
, 20) << __func__
368 << "(): mtime=" << mtime
<< " days=" << days
369 << " base_time=" << base_time
<< " timediff=" << timediff
371 << " is_expired=" << (timediff
>= cmp
)
374 return (timediff
>= cmp
);
377 static bool pass_object_lock_check(rgw::sal::Store
* store
, rgw::sal::Object
* obj
, RGWObjectCtx
& ctx
, const DoutPrefixProvider
*dpp
)
379 if (!obj
->get_bucket()->get_info().obj_lock_enabled()) {
382 std::unique_ptr
<rgw::sal::Object::ReadOp
> read_op
= obj
->get_read_op(&ctx
);
383 int ret
= read_op
->prepare(null_yield
, dpp
);
385 if (ret
== -ENOENT
) {
391 auto iter
= obj
->get_attrs().find(RGW_ATTR_OBJECT_RETENTION
);
392 if (iter
!= obj
->get_attrs().end()) {
393 RGWObjectRetention retention
;
395 decode(retention
, iter
->second
);
396 } catch (buffer::error
& err
) {
397 ldpp_dout(dpp
, 0) << "ERROR: failed to decode RGWObjectRetention"
401 if (ceph::real_clock::to_time_t(retention
.get_retain_until_date()) >
406 iter
= obj
->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD
);
407 if (iter
!= obj
->get_attrs().end()) {
408 RGWObjectLegalHold obj_legal_hold
;
410 decode(obj_legal_hold
, iter
->second
);
411 } catch (buffer::error
& err
) {
412 ldpp_dout(dpp
, 0) << "ERROR: failed to decode RGWObjectLegalHold"
416 if (obj_legal_hold
.is_enabled()) {
425 rgw::sal::Store
* store
;
426 rgw::sal::Bucket
* bucket
;
427 rgw::sal::Bucket::ListParams list_params
;
428 rgw::sal::Bucket::ListResults list_results
;
430 vector
<rgw_bucket_dir_entry
>::iterator obj_iter
;
431 rgw_bucket_dir_entry pre_obj
;
435 LCObjsLister(rgw::sal::Store
* _store
, rgw::sal::Bucket
* _bucket
) :
436 store(_store
), bucket(_bucket
) {
437 list_params
.list_versions
= bucket
->versioned();
438 list_params
.allow_unordered
= true;
439 delay_ms
= store
->ctx()->_conf
.get_val
<int64_t>("rgw_lc_thread_delay");
442 void set_prefix(const string
& p
) {
444 list_params
.prefix
= prefix
;
447 int init(const DoutPrefixProvider
*dpp
) {
451 int fetch(const DoutPrefixProvider
*dpp
) {
452 int ret
= bucket
->list(dpp
, list_params
, 1000, list_results
, null_yield
);
457 obj_iter
= list_results
.objs
.begin();
463 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms
));
466 bool get_obj(const DoutPrefixProvider
*dpp
, rgw_bucket_dir_entry
**obj
,
467 std::function
<void(void)> fetch_barrier
468 = []() { /* nada */}) {
469 if (obj_iter
== list_results
.objs
.end()) {
470 if (!list_results
.is_truncated
) {
475 list_params
.marker
= pre_obj
.key
;
476 int ret
= fetch(dpp
);
478 ldpp_dout(dpp
, 0) << "ERROR: list_op returned ret=" << ret
485 /* returning address of entry in objs */
487 return obj_iter
!= list_results
.objs
.end();
490 rgw_bucket_dir_entry
get_prev_obj() {
499 boost::optional
<std::string
> next_key_name() {
500 if (obj_iter
== list_results
.objs
.end() ||
501 (obj_iter
+ 1) == list_results
.objs
.end()) {
502 /* this should have been called after get_obj() was called, so this should
503 * only happen if is_truncated is false */
507 return ((obj_iter
+ 1)->key
.name
);
510 }; /* LCObjsLister */
514 using LCWorker
= RGWLC::LCWorker
;
517 rgw::sal::Store
* store
;
519 rgw::sal::Bucket
* bucket
;
522 op_env(lc_op
& _op
, rgw::sal::Store
* _store
, LCWorker
* _worker
,
523 rgw::sal::Bucket
* _bucket
, LCObjsLister
& _ol
)
524 : op(_op
), store(_store
), worker(_worker
), bucket(_bucket
),
534 rgw_bucket_dir_entry o
;
535 boost::optional
<std::string
> next_key_name
;
536 ceph::real_time effective_mtime
;
538 rgw::sal::Store
* store
;
539 rgw::sal::Bucket
* bucket
;
540 lc_op
& op
; // ok--refers to expanded env.op
543 std::unique_ptr
<rgw::sal::Object
> obj
;
545 const DoutPrefixProvider
*dpp
;
548 RGWZoneGroupPlacementTier tier
= {};
550 lc_op_ctx(op_env
& env
, rgw_bucket_dir_entry
& o
,
551 boost::optional
<std::string
> next_key_name
,
552 ceph::real_time effective_mtime
,
553 const DoutPrefixProvider
*dpp
, WorkQ
* wq
)
554 : cct(env
.store
->ctx()), env(env
), o(o
), next_key_name(next_key_name
),
555 effective_mtime(effective_mtime
),
556 store(env
.store
), bucket(env
.bucket
), op(env
.op
), ol(env
.ol
),
557 rctx(env
.store
), dpp(dpp
), wq(wq
)
559 obj
= bucket
->get_object(o
.key
);
562 bool next_has_same_name(const std::string
& key_name
) {
563 return (next_key_name
&& key_name
.compare(
564 boost::get
<std::string
>(next_key_name
)) == 0);
570 static std::string lc_id
= "rgw lifecycle";
571 static std::string lc_req_id
= "0";
573 static int remove_expired_obj(
574 const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
, bool remove_indeed
,
575 rgw::notify::EventType event_type
)
577 auto& store
= oc
.store
;
578 auto& bucket_info
= oc
.bucket
->get_info();
580 auto obj_key
= o
.key
;
583 std::string version_id
;
585 if (!remove_indeed
) {
586 obj_key
.instance
.clear();
587 } else if (obj_key
.instance
.empty()) {
588 obj_key
.instance
= "null";
591 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
592 std::unique_ptr
<rgw::sal::Object
> obj
;
594 ret
= store
->get_bucket(nullptr, bucket_info
, &bucket
);
599 // XXXX currently, rgw::sal::Bucket.owner is always null here
600 std::unique_ptr
<rgw::sal::User
> user
;
601 if (! bucket
->get_owner()) {
602 auto& bucket_info
= bucket
->get_info();
603 user
= store
->get_user(bucket_info
.owner
);
606 bucket
->set_owner(user
.get());
610 obj
= bucket
->get_object(obj_key
);
611 std::unique_ptr
<rgw::sal::Object::DeleteOp
> del_op
612 = obj
->get_delete_op(&oc
.rctx
);
613 del_op
->params
.versioning_status
614 = obj
->get_bucket()->get_info().versioning_status();
615 del_op
->params
.obj_owner
.set_id(rgw_user
{meta
.owner
});
616 del_op
->params
.obj_owner
.set_name(meta
.owner_display_name
);
617 del_op
->params
.bucket_owner
.set_id(bucket_info
.owner
);
618 del_op
->params
.unmod_since
= meta
.mtime
;
619 del_op
->params
.marker_version_id
= version_id
;
621 std::unique_ptr
<rgw::sal::Notification
> notify
622 = store
->get_notification(dpp
, obj
.get(), nullptr, &oc
.rctx
, event_type
,
624 const_cast<std::string
&>(oc
.bucket
->get_tenant()),
625 lc_req_id
, null_yield
);
627 /* can eliminate cast when reservation is lifted into Notification */
628 auto notify_res
= static_cast<rgw::sal::RadosNotification
*>(notify
.get())->get_reservation();
630 ret
= rgw::notify::publish_reserve(dpp
, event_type
, notify_res
, nullptr);
633 << "ERROR: notify reservation failed, deferring delete of object k="
639 ret
= del_op
->delete_obj(dpp
, null_yield
);
642 "ERROR: publishing notification failed, with error: " << ret
<< dendl
;
644 // send request to notification manager
645 (void) rgw::notify::publish_commit(
646 obj
.get(), obj
->get_obj_size(), ceph::real_clock::now(),
647 obj
->get_attrs()[RGW_ATTR_ETAG
].to_str(), version_id
, event_type
,
653 } /* remove_expired_obj */
657 virtual ~LCOpAction() {}
659 virtual bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) {
663 /* called after check(). Check should tell us whether this action
664 * is applicable. If there are multiple actions, we'll end up executing
665 * the latest applicable action
667 * one action after 10 days, another after 20, third after 40.
668 * After 10 days, the latest applicable action would be the first one,
669 * after 20 days it will be the second one. After 21 days it will still be the
670 * second one. So check() should return true for the second action at that point,
671 * but should_process() if the action has already been applied. In object removal
672 * it doesn't matter, but in object transition it does.
674 virtual bool should_process() {
678 virtual int process(lc_op_ctx
& oc
) {
682 friend class LCOpRule
;
687 virtual ~LCOpFilter() {}
688 virtual bool check(const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
) {
694 friend class LCOpAction
;
697 boost::optional
<std::string
> next_key_name
;
698 ceph::real_time effective_mtime
;
700 std::vector
<shared_ptr
<LCOpFilter
> > filters
; // n.b., sharing ovhd
701 std::vector
<shared_ptr
<LCOpAction
> > actions
;
704 LCOpRule(op_env
& _env
) : env(_env
) {}
706 boost::optional
<std::string
> get_next_key_name() {
707 return next_key_name
;
710 std::vector
<shared_ptr
<LCOpAction
>>& get_actions() {
716 int process(rgw_bucket_dir_entry
& o
, const DoutPrefixProvider
*dpp
,
721 boost::variant
<void*,
722 /* out-of-line delete */
723 std::tuple
<LCOpRule
, rgw_bucket_dir_entry
>,
724 /* uncompleted MPU expiration */
725 std::tuple
<lc_op
, rgw_bucket_dir_entry
>,
726 rgw_bucket_dir_entry
>;
728 class WorkQ
: public Thread
731 using unique_lock
= std::unique_lock
<std::mutex
>;
732 using work_f
= std::function
<void(RGWLC::LCWorker
*, WorkQ
*, WorkItem
&)>;
733 using dequeue_result
= boost::variant
<void*, WorkItem
>;
735 static constexpr uint32_t FLAG_NONE
= 0x0000;
736 static constexpr uint32_t FLAG_EWAIT_SYNC
= 0x0001;
737 static constexpr uint32_t FLAG_DWAIT_SYNC
= 0x0002;
738 static constexpr uint32_t FLAG_EDRAIN_SYNC
= 0x0004;
741 const work_f bsf
= [](RGWLC::LCWorker
* wk
, WorkQ
* wq
, WorkItem
& wi
) {};
746 std::condition_variable cv
;
748 vector
<WorkItem
> items
;
752 WorkQ(RGWLC::LCWorker
* wk
, uint32_t ix
, uint32_t qmax
)
753 : wk(wk
), qmax(qmax
), ix(ix
), flags(FLAG_NONE
), f(bsf
)
755 create(thr_name().c_str());
758 std::string
thr_name() {
759 return std::string
{"wp_thrd: "}
760 + std::to_string(wk
->ix
) + ", " + std::to_string(ix
);
763 void setf(work_f _f
) {
767 void enqueue(WorkItem
&& item
) {
768 unique_lock
uniq(mtx
);
769 while ((!wk
->get_lc()->going_down()) &&
770 (items
.size() > qmax
)) {
771 flags
|= FLAG_EWAIT_SYNC
;
772 cv
.wait_for(uniq
, 200ms
);
774 items
.push_back(item
);
775 if (flags
& FLAG_DWAIT_SYNC
) {
776 flags
&= ~FLAG_DWAIT_SYNC
;
782 unique_lock
uniq(mtx
);
783 flags
|= FLAG_EDRAIN_SYNC
;
784 while (flags
& FLAG_EDRAIN_SYNC
) {
785 cv
.wait_for(uniq
, 200ms
);
790 dequeue_result
dequeue() {
791 unique_lock
uniq(mtx
);
792 while ((!wk
->get_lc()->going_down()) &&
793 (items
.size() == 0)) {
794 /* clear drain state, as we are NOT doing work and qlen==0 */
795 if (flags
& FLAG_EDRAIN_SYNC
) {
796 flags
&= ~FLAG_EDRAIN_SYNC
;
798 flags
|= FLAG_DWAIT_SYNC
;
799 cv
.wait_for(uniq
, 200ms
);
801 if (items
.size() > 0) {
802 auto item
= items
.back();
804 if (flags
& FLAG_EWAIT_SYNC
) {
805 flags
&= ~FLAG_EWAIT_SYNC
;
813 void* entry() override
{
814 while (!wk
->get_lc()->going_down()) {
815 auto item
= dequeue();
816 if (item
.which() == 0) {
820 f(wk
, this, boost::get
<WorkItem
>(item
));
826 class RGWLC::WorkPool
828 using TVector
= ceph::containers::tiny_vector
<WorkQ
, 3>;
833 WorkPool(RGWLC::LCWorker
* wk
, uint16_t n_threads
, uint32_t qmax
)
836 [&](const size_t ix
, auto emplacer
) {
837 emplacer
.emplace(wk
, ix
, qmax
);
843 for (auto& wq
: wqs
) {
848 void setf(WorkQ::work_f _f
) {
849 for (auto& wq
: wqs
) {
854 void enqueue(WorkItem item
) {
856 ix
= (ix
+1) % wqs
.size();
857 (wqs
[tix
]).enqueue(std::move(item
));
861 for (auto& wq
: wqs
) {
867 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider
* dpp
, CephContext
*cct
,
869 : dpp(dpp
), cct(cct
), lc(lc
), ix(ix
)
871 auto wpw
= cct
->_conf
.get_val
<int64_t>("rgw_lc_max_wp_worker");
872 workpool
= new WorkPool(this, wpw
, 512);
875 static inline bool worker_should_stop(time_t stop_at
, bool once
)
877 return !once
&& stop_at
< time(nullptr);
880 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket
* target
,
881 const multimap
<string
, lc_op
>& prefix_map
,
882 LCWorker
* worker
, time_t stop_at
, bool once
)
884 MultipartMetaFilter mp_filter
;
886 rgw::sal::Bucket::ListParams params
;
887 rgw::sal::Bucket::ListResults results
;
888 auto delay_ms
= cct
->_conf
.get_val
<int64_t>("rgw_lc_thread_delay");
889 params
.list_versions
= false;
890 /* lifecycle processing does not depend on total order, so can
891 * take advantage of unordered listing optimizations--such as
892 * operating on one shard at a time */
893 params
.allow_unordered
= true;
894 params
.ns
= RGW_OBJ_NS_MULTIPART
;
895 params
.access_list_filter
= &mp_filter
;
897 auto pf
= [&](RGWLC::LCWorker
* wk
, WorkQ
* wq
, WorkItem
& wi
) {
898 auto wt
= boost::get
<std::tuple
<lc_op
, rgw_bucket_dir_entry
>>(wi
);
899 auto& [rule
, obj
] = wt
;
900 if (obj_has_expired(this, cct
, obj
.meta
.mtime
, rule
.mp_expiration
)) {
901 rgw_obj_key
key(obj
.key
);
902 std::unique_ptr
<rgw::sal::MultipartUpload
> mpu
= target
->get_multipart_upload(key
.name
);
903 RGWObjectCtx
rctx(store
);
904 int ret
= mpu
->abort(this, cct
, &rctx
);
907 perfcounter
->inc(l_rgw_lc_abort_mpu
, 1);
910 if (ret
== -ERR_NO_SUCH_UPLOAD
) {
911 ldpp_dout(wk
->get_lc(), 5)
912 << "ERROR: abort_multipart_upload failed, ret=" << ret
913 << ", thread:" << wq
->thr_name()
914 << ", meta:" << obj
.key
917 ldpp_dout(wk
->get_lc(), 0)
918 << "ERROR: abort_multipart_upload failed, ret=" << ret
919 << ", thread:" << wq
->thr_name()
920 << ", meta:" << obj
.key
927 worker
->workpool
->setf(pf
);
929 for (auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end();
932 if (worker_should_stop(stop_at
, once
)) {
933 ldpp_dout(this, 5) << __func__
<< " interval budget EXPIRED worker "
939 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.mp_expiration
<= 0) {
942 params
.prefix
= prefix_iter
->first
;
944 results
.objs
.clear();
945 ret
= target
->list(this, params
, 1000, results
, null_yield
);
947 if (ret
== (-ENOENT
))
949 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl
;
953 for (auto obj_iter
= results
.objs
.begin(); obj_iter
!= results
.objs
.end(); ++obj_iter
) {
954 std::tuple
<lc_op
, rgw_bucket_dir_entry
> t1
=
955 {prefix_iter
->second
, *obj_iter
};
956 worker
->workpool
->enqueue(WorkItem
{t1
});
962 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms
));
963 } while(results
.is_truncated
);
964 } /* for prefix_map */
966 worker
->workpool
->drain();
970 static int read_obj_tags(const DoutPrefixProvider
*dpp
, rgw::sal::Object
* obj
, RGWObjectCtx
& ctx
, bufferlist
& tags_bl
)
972 std::unique_ptr
<rgw::sal::Object::ReadOp
> rop
= obj
->get_read_op(&ctx
);
974 return rop
->get_attr(dpp
, RGW_ATTR_TAGS
, tags_bl
, null_yield
);
977 static bool is_valid_op(const lc_op
& op
)
981 || op
.expiration_date
!= boost::none
982 || op
.noncur_expiration
> 0
984 || !op
.transitions
.empty()
985 || !op
.noncur_transitions
.empty()));
988 static inline bool has_all_tags(const lc_op
& rule_action
,
989 const RGWObjTags
& object_tags
)
991 if(! rule_action
.obj_tags
)
993 if(object_tags
.count() < rule_action
.obj_tags
->count())
995 size_t tag_count
= 0;
996 for (const auto& tag
: object_tags
.get_tags()) {
997 const auto& rule_tags
= rule_action
.obj_tags
->get_tags();
998 const auto& iter
= rule_tags
.find(tag
.first
);
999 if(iter
== rule_tags
.end())
1001 if(iter
->second
== tag
.second
)
1005 /* all tags in the rule appear in obj tags */
1007 return tag_count
== rule_action
.obj_tags
->count();
1010 static int check_tags(const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
, bool *skip
)
1014 if (op
.obj_tags
!= boost::none
) {
1018 int ret
= read_obj_tags(dpp
, oc
.obj
.get(), oc
.rctx
, tags_bl
);
1020 if (ret
!= -ENODATA
) {
1021 ldpp_dout(oc
.dpp
, 5) << "ERROR: read_obj_tags returned r="
1022 << ret
<< " " << oc
.wq
->thr_name() << dendl
;
1026 RGWObjTags dest_obj_tags
;
1028 auto iter
= tags_bl
.cbegin();
1029 dest_obj_tags
.decode(iter
);
1030 } catch (buffer::error
& err
) {
1031 ldpp_dout(oc
.dpp
,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
1032 << oc
.wq
->thr_name() << dendl
;
1036 if (! has_all_tags(op
, dest_obj_tags
)) {
1037 ldpp_dout(oc
.dpp
, 20) << __func__
<< "() skipping obj " << oc
.obj
1038 << " as tags do not match in rule: "
1040 << oc
.wq
->thr_name() << dendl
;
1048 class LCOpFilter_Tags
: public LCOpFilter
{
1050 bool check(const DoutPrefixProvider
*dpp
, lc_op_ctx
& oc
) override
{
1053 if (o
.is_delete_marker()) {
1059 int ret
= check_tags(dpp
, oc
, &skip
);
1061 if (ret
== -ENOENT
) {
1064 ldpp_dout(oc
.dpp
, 0) << "ERROR: check_tags on obj=" << oc
.obj
1065 << " returned ret=" << ret
<< " "
1066 << oc
.wq
->thr_name() << dendl
;
1074 class LCOpAction_CurrentExpiration
: public LCOpAction
{
1076 LCOpAction_CurrentExpiration(op_env
& env
) {}
1078 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1080 if (!o
.is_current()) {
1081 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1082 << ": not current, skipping "
1083 << oc
.wq
->thr_name() << dendl
;
1086 if (o
.is_delete_marker()) {
1088 if (oc
.next_key_name
) nkn
= *oc
.next_key_name
;
1089 if (oc
.next_has_same_name(o
.key
.name
)) {
1090 ldpp_dout(dpp
, 7) << __func__
<< "(): dm-check SAME: key=" << o
.key
1091 << " next_key_name: %%" << nkn
<< "%% "
1092 << oc
.wq
->thr_name() << dendl
;
1095 ldpp_dout(dpp
, 7) << __func__
<< "(): dm-check DELE: key=" << o
.key
1096 << " next_key_name: %%" << nkn
<< "%% "
1097 << oc
.wq
->thr_name() << dendl
;
1098 *exp_time
= real_clock::now();
1103 auto& mtime
= o
.meta
.mtime
;
1106 if (op
.expiration
<= 0) {
1107 if (op
.expiration_date
== boost::none
) {
1108 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1109 << ": no expiration set in rule, skipping "
1110 << oc
.wq
->thr_name() << dendl
;
1113 is_expired
= ceph_clock_now() >=
1114 ceph::real_clock::to_time_t(*op
.expiration_date
);
1115 *exp_time
= *op
.expiration_date
;
1117 is_expired
= obj_has_expired(dpp
, oc
.cct
, mtime
, op
.expiration
, exp_time
);
1120 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired="
1121 << (int)is_expired
<< " "
1122 << oc
.wq
->thr_name() << dendl
;
1126 int process(lc_op_ctx
& oc
) {
1129 if (o
.is_delete_marker()) {
1130 r
= remove_expired_obj(oc
.dpp
, oc
, true,
1131 rgw::notify::ObjectExpirationDeleteMarker
);
1133 ldpp_dout(oc
.dpp
, 0) << "ERROR: current is-dm remove_expired_obj "
1134 << oc
.bucket
<< ":" << o
.key
1135 << " " << cpp_strerror(r
) << " "
1136 << oc
.wq
->thr_name() << dendl
;
1139 ldpp_dout(oc
.dpp
, 2) << "DELETED: current is-dm "
1140 << oc
.bucket
<< ":" << o
.key
1141 << " " << oc
.wq
->thr_name() << dendl
;
1143 /* ! o.is_delete_marker() */
1144 r
= remove_expired_obj(oc
.dpp
, oc
, !oc
.bucket
->versioned(),
1145 rgw::notify::ObjectExpirationCurrent
);
1147 ldpp_dout(oc
.dpp
, 0) << "ERROR: remove_expired_obj "
1148 << oc
.bucket
<< ":" << o
.key
1149 << " " << cpp_strerror(r
) << " "
1150 << oc
.wq
->thr_name() << dendl
;
1154 perfcounter
->inc(l_rgw_lc_expire_current
, 1);
1156 ldpp_dout(oc
.dpp
, 2) << "DELETED:" << oc
.bucket
<< ":" << o
.key
1157 << " " << oc
.wq
->thr_name() << dendl
;
1163 class LCOpAction_NonCurrentExpiration
: public LCOpAction
{
1166 LCOpAction_NonCurrentExpiration(op_env
& env
)
1169 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1171 if (o
.is_current()) {
1172 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1173 << ": current version, skipping "
1174 << oc
.wq
->thr_name() << dendl
;
1178 int expiration
= oc
.op
.noncur_expiration
;
1179 bool is_expired
= obj_has_expired(dpp
, oc
.cct
, oc
.effective_mtime
, expiration
,
1182 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired="
1183 << is_expired
<< " "
1184 << oc
.wq
->thr_name() << dendl
;
1186 return is_expired
&&
1187 pass_object_lock_check(oc
.store
, oc
.obj
.get(), oc
.rctx
, dpp
);
1190 int process(lc_op_ctx
& oc
) {
1192 int r
= remove_expired_obj(oc
.dpp
, oc
, true,
1193 rgw::notify::ObjectExpirationNoncurrent
);
1195 ldpp_dout(oc
.dpp
, 0) << "ERROR: remove_expired_obj (non-current expiration) "
1196 << oc
.bucket
<< ":" << o
.key
1197 << " " << cpp_strerror(r
)
1198 << " " << oc
.wq
->thr_name() << dendl
;
1202 perfcounter
->inc(l_rgw_lc_expire_noncurrent
, 1);
1204 ldpp_dout(oc
.dpp
, 2) << "DELETED:" << oc
.bucket
<< ":" << o
.key
1205 << " (non-current expiration) "
1206 << oc
.wq
->thr_name() << dendl
;
1211 class LCOpAction_DMExpiration
: public LCOpAction
{
1213 LCOpAction_DMExpiration(op_env
& env
) {}
1215 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1217 if (!o
.is_delete_marker()) {
1218 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1219 << ": not a delete marker, skipping "
1220 << oc
.wq
->thr_name() << dendl
;
1223 if (oc
.next_has_same_name(o
.key
.name
)) {
1224 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1225 << ": next is same object, skipping "
1226 << oc
.wq
->thr_name() << dendl
;
1230 *exp_time
= real_clock::now();
1235 int process(lc_op_ctx
& oc
) {
1237 int r
= remove_expired_obj(oc
.dpp
, oc
, true,
1238 rgw::notify::ObjectExpirationDeleteMarker
);
1240 ldpp_dout(oc
.dpp
, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
1241 << oc
.bucket
<< ":" << o
.key
1242 << " " << cpp_strerror(r
)
1243 << " " << oc
.wq
->thr_name()
1248 perfcounter
->inc(l_rgw_lc_expire_dm
, 1);
1250 ldpp_dout(oc
.dpp
, 2) << "DELETED:" << oc
.bucket
<< ":" << o
.key
1251 << " (delete marker expiration) "
1252 << oc
.wq
->thr_name() << dendl
;
1257 class LCOpAction_Transition
: public LCOpAction
{
1258 const transition_action
& transition
;
1259 bool need_to_process
{false};
1262 virtual bool check_current_state(bool is_current
) = 0;
1263 virtual ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) = 0;
1265 LCOpAction_Transition(const transition_action
& _transition
)
1266 : transition(_transition
) {}
1268 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
, const DoutPrefixProvider
*dpp
) override
{
1271 if (o
.is_delete_marker()) {
1275 if (!check_current_state(o
.is_current())) {
1279 auto mtime
= get_effective_mtime(oc
);
1281 if (transition
.days
< 0) {
1282 if (transition
.date
== boost::none
) {
1283 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1284 << ": no transition day/date set in rule, skipping "
1285 << oc
.wq
->thr_name() << dendl
;
1288 is_expired
= ceph_clock_now() >=
1289 ceph::real_clock::to_time_t(*transition
.date
);
1290 *exp_time
= *transition
.date
;
1292 is_expired
= obj_has_expired(dpp
, oc
.cct
, mtime
, transition
.days
, exp_time
);
1295 ldpp_dout(oc
.dpp
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired="
1296 << is_expired
<< " "
1297 << oc
.wq
->thr_name() << dendl
;
1300 (rgw_placement_rule::get_canonical_storage_class(o
.meta
.storage_class
) !=
1301 transition
.storage_class
);
1306 bool should_process() override
{
1307 return need_to_process
;
1310 /* find out if the the storage class is remote cloud */
1311 int get_tier_target(const RGWZoneGroup
&zonegroup
, const rgw_placement_rule
& rule
,
1312 RGWZoneGroupPlacementTier
&tier
) {
1313 std::map
<std::string
, RGWZoneGroupPlacementTarget
>::const_iterator titer
;
1314 titer
= zonegroup
.placement_targets
.find(rule
.name
);
1315 if (titer
== zonegroup
.placement_targets
.end()) {
1319 const auto& target_rule
= titer
->second
;
1320 std::map
<std::string
, RGWZoneGroupPlacementTier
>::const_iterator ttier
;
1321 ttier
= target_rule
.tier_targets
.find(rule
.storage_class
);
1322 if (ttier
!= target_rule
.tier_targets
.end()) {
1323 tier
= ttier
->second
;
1324 } else { // not found
1330 int delete_tier_obj(lc_op_ctx
& oc
) {
1333 /* If bucket is versioned, create delete_marker for current version
1335 if (oc
.bucket
->versioned() && oc
.o
.is_current() && !oc
.o
.is_delete_marker()) {
1336 ret
= remove_expired_obj(oc
.dpp
, oc
, false, rgw::notify::ObjectExpiration
);
1337 ldpp_dout(oc
.dpp
, 20) << "delete_tier_obj Object(key:" << oc
.o
.key
<< ") current & not delete_marker" << " versioned_epoch: " << oc
.o
.versioned_epoch
<< "flags: " << oc
.o
.flags
<< dendl
;
1339 ret
= remove_expired_obj(oc
.dpp
, oc
, true, rgw::notify::ObjectExpiration
);
1340 ldpp_dout(oc
.dpp
, 20) << "delete_tier_obj Object(key:" << oc
.o
.key
<< ") not current " << "versioned_epoch: " << oc
.o
.versioned_epoch
<< "flags: " << oc
.o
.flags
<< dendl
;
1345 int update_tier_obj(lc_op_ctx
& oc
, RGWLCCloudTierCtx
& tier_ctx
) {
1347 map
<string
, bufferlist
> attrs
;
1350 real_time read_mtime
;
1352 std::unique_ptr
<rgw::sal::Object::ReadOp
> read_op(oc
.obj
->get_read_op(&oc
.rctx
));
1354 read_op
->params
.lastmod
= &read_mtime
;
1356 r
= read_op
->prepare(null_yield
, oc
.dpp
);
1361 if (read_mtime
!= tier_ctx
.o
.meta
.mtime
) {
1366 attrs
= oc
.obj
->get_attrs();
1368 rgw::sal::RadosStore
*rados
= static_cast<rgw::sal::RadosStore
*>(oc
.store
);
1369 RGWRados::Object
op_target(rados
->getRados(), oc
.bucket
->get_info(), oc
.rctx
, oc
.obj
->get_obj());
1370 RGWRados::Object::Write
obj_op(&op_target
);
1372 obj_op
.meta
.modify_tail
= true;
1373 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
1374 obj_op
.meta
.category
= RGWObjCategory::CloudTiered
;
1375 obj_op
.meta
.delete_at
= real_time();
1377 obj_op
.meta
.data
= &blo
;
1378 obj_op
.meta
.if_match
= NULL
;
1379 obj_op
.meta
.if_nomatch
= NULL
;
1380 obj_op
.meta
.user_data
= NULL
;
1381 obj_op
.meta
.zones_trace
= NULL
;
1382 obj_op
.meta
.delete_at
= real_time();
1383 obj_op
.meta
.olh_epoch
= tier_ctx
.o
.versioned_epoch
;
1385 RGWObjManifest
*pmanifest
;
1386 RGWObjManifest manifest
;
1388 pmanifest
= &manifest
;
1389 RGWObjTier tier_config
;
1390 tier_config
.name
= oc
.tier
.storage_class
;
1391 tier_config
.tier_placement
= oc
.tier
;
1392 tier_config
.is_multipart_upload
= tier_ctx
.is_multipart_upload
;
1394 pmanifest
->set_tier_type("cloud-s3");
1395 pmanifest
->set_tier_config(tier_config
);
1397 /* check if its necessary */
1398 rgw_placement_rule target_placement
;
1399 target_placement
.inherit_from(tier_ctx
.bucket_info
.placement_rule
);
1400 target_placement
.storage_class
= oc
.tier
.storage_class
;
1401 pmanifest
->set_head(target_placement
, tier_ctx
.obj
->get_obj(), 0);
1403 pmanifest
->set_tail_placement(target_placement
, tier_ctx
.obj
->get_obj().bucket
);
1405 pmanifest
->set_obj_size(0);
1407 obj_op
.meta
.manifest
= pmanifest
;
1409 /* update storage class */
1411 bl
.append(oc
.tier
.storage_class
);
1412 attrs
[RGW_ATTR_STORAGE_CLASS
] = bl
;
1414 attrs
.erase(RGW_ATTR_ID_TAG
);
1415 attrs
.erase(RGW_ATTR_TAIL_TAG
);
1417 r
= obj_op
.write_meta(oc
.dpp
, 0, 0, attrs
, null_yield
);
1425 int transition_obj_to_cloud(lc_op_ctx
& oc
) {
1427 string id
= "cloudid";
1428 string endpoint
= oc
.tier
.t
.s3
.endpoint
;
1429 RGWAccessKey key
= oc
.tier
.t
.s3
.key
;
1430 string region
= oc
.tier
.t
.s3
.region
;
1431 HostStyle host_style
= oc
.tier
.t
.s3
.host_style
;
1432 string bucket_name
= oc
.tier
.t
.s3
.target_path
;
1433 const RGWZoneGroup
& zonegroup
= oc
.store
->get_zone()->get_zonegroup();
1436 /* If CurrentVersion object, remove it & create delete marker */
1437 delete_object
= (!oc
.tier
.retain_head_object
||
1438 (oc
.o
.is_current() && oc
.bucket
->versioned()));
1440 if (bucket_name
.empty()) {
1441 bucket_name
= "rgwx-" + zonegroup
.get_name() + "-" + oc
.tier
.storage_class
+
1443 boost::algorithm::to_lower(bucket_name
);
1446 /* Create RGW REST connection */
1447 S3RESTConn
conn(oc
.cct
, oc
.store
, id
, { endpoint
}, key
, region
, host_style
);
1449 RGWLCCloudTierCtx
tier_ctx(oc
.cct
, oc
.dpp
, oc
.o
, oc
.store
, oc
.bucket
->get_info(),
1450 oc
.obj
.get(), oc
.rctx
, conn
, bucket_name
,
1451 oc
.tier
.t
.s3
.target_storage_class
);
1452 tier_ctx
.acl_mappings
= oc
.tier
.t
.s3
.acl_mappings
;
1453 tier_ctx
.multipart_min_part_size
= oc
.tier
.t
.s3
.multipart_min_part_size
;
1454 tier_ctx
.multipart_sync_threshold
= oc
.tier
.t
.s3
.multipart_sync_threshold
;
1455 tier_ctx
.storage_class
= oc
.tier
.storage_class
;
1457 // check if target_path is already created
1458 std::set
<std::string
>& cloud_targets
= oc
.env
.worker
->get_cloud_targets();
1459 std::pair
<std::set
<std::string
>::iterator
, bool> it
;
1461 it
= cloud_targets
.insert(bucket_name
);
1462 tier_ctx
.target_bucket_created
= !(it
.second
);
1464 ldpp_dout(oc
.dpp
, 0) << "Transitioning object(" << oc
.o
.key
<< ") to the cloud endpoint(" << endpoint
<< ")" << dendl
;
1466 /* Transition object to cloud end point */
1467 int ret
= rgw_cloud_tier_transfer_object(tier_ctx
);
1470 ldpp_dout(oc
.dpp
, 0) << "ERROR: failed to transfer object(" << oc
.o
.key
<< ") to the cloud endpoint(" << endpoint
<< ") ret=" << ret
<< dendl
;
1473 if (!tier_ctx
.target_bucket_created
) {
1474 cloud_targets
.erase(it
.first
);
1478 if (delete_object
) {
1479 ret
= delete_tier_obj(oc
);
1481 ldpp_dout(oc
.dpp
, 0) << "ERROR: Deleting tier object(" << oc
.o
.key
<< ") failed ret=" << ret
<< dendl
;
1485 ret
= update_tier_obj(oc
, tier_ctx
);
1487 ldpp_dout(oc
.dpp
, 0) << "ERROR: Updating tier object(" << oc
.o
.key
<< ") failed ret=" << ret
<< dendl
;
1495 int process(lc_op_ctx
& oc
) {
1499 if (oc
.o
.meta
.category
== RGWObjCategory::CloudTiered
) {
1500 /* Skip objects which are already cloud tiered. */
1501 ldpp_dout(oc
.dpp
, 30) << "Object(key:" << oc
.o
.key
<< ") is already cloud tiered to cloud-s3 tier: " << oc
.o
.meta
.storage_class
<< dendl
;
1505 std::string tier_type
= "";
1506 const RGWZoneGroup
& zonegroup
= oc
.store
->get_zone()->get_zonegroup();
1508 rgw_placement_rule target_placement
;
1509 target_placement
.inherit_from(oc
.bucket
->get_placement_rule());
1510 target_placement
.storage_class
= transition
.storage_class
;
1512 r
= get_tier_target(zonegroup
, target_placement
, oc
.tier
);
1514 if (!r
&& oc
.tier
.tier_type
== "cloud-s3") {
1515 ldpp_dout(oc
.dpp
, 30) << "Found cloud s3 tier: " << target_placement
.storage_class
<< dendl
;
1516 if (!oc
.o
.is_current() &&
1517 !pass_object_lock_check(oc
.store
, oc
.obj
.get(), oc
.rctx
, oc
.dpp
)) {
1518 /* Skip objects which has object lock enabled. */
1519 ldpp_dout(oc
.dpp
, 10) << "Object(key:" << oc
.o
.key
<< ") is locked. Skipping transition to cloud-s3 tier: " << target_placement
.storage_class
<< dendl
;
1523 /* Allow transition for only RadosStore */
1524 rgw::sal::RadosStore
*rados
= dynamic_cast<rgw::sal::RadosStore
*>(oc
.store
);
1527 ldpp_dout(oc
.dpp
, 10) << "Object(key:" << oc
.o
.key
<< ") is not on RadosStore. Skipping transition to cloud-s3 tier: " << target_placement
.storage_class
<< dendl
;
1531 r
= transition_obj_to_cloud(oc
);
1533 ldpp_dout(oc
.dpp
, 0) << "ERROR: failed to transition obj(key:" << oc
.o
.key
<< ") to cloud (r=" << r
<< ")"
1538 if (!oc
.store
->get_zone()->get_params().
1539 valid_placement(target_placement
)) {
1540 ldpp_dout(oc
.dpp
, 0) << "ERROR: non existent dest placement: "
1542 << " bucket="<< oc
.bucket
1543 << " rule_id=" << oc
.op
.id
1544 << " " << oc
.wq
->thr_name() << dendl
;
1548 int r
= oc
.obj
->transition(oc
.rctx
, oc
.bucket
, target_placement
, o
.meta
.mtime
,
1549 o
.versioned_epoch
, oc
.dpp
, null_yield
);
1551 ldpp_dout(oc
.dpp
, 0) << "ERROR: failed to transition obj "
1552 << oc
.bucket
<< ":" << o
.key
1553 << " -> " << transition
.storage_class
1554 << " " << cpp_strerror(r
)
1555 << " " << oc
.wq
->thr_name() << dendl
;
1559 ldpp_dout(oc
.dpp
, 2) << "TRANSITIONED:" << oc
.bucket
1560 << ":" << o
.key
<< " -> "
1561 << transition
.storage_class
1562 << " " << oc
.wq
->thr_name() << dendl
;
1567 class LCOpAction_CurrentTransition
: public LCOpAction_Transition
{
1569 bool check_current_state(bool is_current
) override
{
1573 ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) override
{
1574 return oc
.o
.meta
.mtime
;
1577 LCOpAction_CurrentTransition(const transition_action
& _transition
)
1578 : LCOpAction_Transition(_transition
) {}
1579 int process(lc_op_ctx
& oc
) {
1580 int r
= LCOpAction_Transition::process(oc
);
1583 perfcounter
->inc(l_rgw_lc_transition_current
, 1);
1590 class LCOpAction_NonCurrentTransition
: public LCOpAction_Transition
{
1592 bool check_current_state(bool is_current
) override
{
1596 ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) override
{
1597 return oc
.effective_mtime
;
1600 LCOpAction_NonCurrentTransition(op_env
& env
,
1601 const transition_action
& _transition
)
1602 : LCOpAction_Transition(_transition
)
1604 int process(lc_op_ctx
& oc
) {
1605 int r
= LCOpAction_Transition::process(oc
);
1608 perfcounter
->inc(l_rgw_lc_transition_noncurrent
, 1);
1615 void LCOpRule::build()
1617 filters
.emplace_back(new LCOpFilter_Tags
);
1621 if (op
.expiration
> 0 ||
1622 op
.expiration_date
!= boost::none
) {
1623 actions
.emplace_back(new LCOpAction_CurrentExpiration(env
));
1626 if (op
.dm_expiration
) {
1627 actions
.emplace_back(new LCOpAction_DMExpiration(env
));
1630 if (op
.noncur_expiration
> 0) {
1631 actions
.emplace_back(new LCOpAction_NonCurrentExpiration(env
));
1634 for (auto& iter
: op
.transitions
) {
1635 actions
.emplace_back(new LCOpAction_CurrentTransition(iter
.second
));
1638 for (auto& iter
: op
.noncur_transitions
) {
1639 actions
.emplace_back(new LCOpAction_NonCurrentTransition(env
, iter
.second
));
1643 void LCOpRule::update()
1645 next_key_name
= env
.ol
.next_key_name();
1646 effective_mtime
= env
.ol
.get_prev_obj().meta
.mtime
;
1649 int LCOpRule::process(rgw_bucket_dir_entry
& o
,
1650 const DoutPrefixProvider
*dpp
,
1653 lc_op_ctx
ctx(env
, o
, next_key_name
, effective_mtime
, dpp
, wq
);
1654 shared_ptr
<LCOpAction
> *selected
= nullptr; // n.b., req'd by sharing
1657 for (auto& a
: actions
) {
1658 real_time action_exp
;
1660 if (a
->check(ctx
, &action_exp
, dpp
)) {
1661 if (action_exp
> exp
) {
1669 (*selected
)->should_process()) {
1672 * Calling filter checks after action checks because
1673 * all action checks (as they are implemented now) do
1674 * not access the objects themselves, but return result
1675 * from info from bucket index listing. The current tags filter
1676 * check does access the objects, so we avoid unnecessary rados calls
1677 * having filters check later in the process.
1681 for (auto& f
: filters
) {
1682 if (f
->check(dpp
, ctx
)) {
1689 ldpp_dout(dpp
, 20) << __func__
<< "(): key=" << o
.key
1690 << ": no rule match, skipping "
1691 << wq
->thr_name() << dendl
;
1695 int r
= (*selected
)->process(ctx
);
1697 ldpp_dout(dpp
, 0) << "ERROR: remove_expired_obj "
1698 << env
.bucket
<< ":" << o
.key
1699 << " " << cpp_strerror(r
)
1700 << " " << wq
->thr_name() << dendl
;
1703 ldpp_dout(dpp
, 20) << "processed:" << env
.bucket
<< ":"
1704 << o
.key
<< " " << wq
->thr_name() << dendl
;
1711 int RGWLC::bucket_lc_process(string
& shard_id
, LCWorker
* worker
,
1712 time_t stop_at
, bool once
)
1714 RGWLifecycleConfiguration
config(cct
);
1715 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
1716 string no_ns
, list_versions
;
1717 vector
<rgw_bucket_dir_entry
> objs
;
1718 vector
<std::string
> result
;
1719 boost::split(result
, shard_id
, boost::is_any_of(":"));
1720 string bucket_tenant
= result
[0];
1721 string bucket_name
= result
[1];
1722 string bucket_marker
= result
[2];
1723 int ret
= store
->get_bucket(this, nullptr, bucket_tenant
, bucket_name
, &bucket
, null_yield
);
1725 ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
1726 << " failed" << dendl
;
1730 ret
= bucket
->load_bucket(this, null_yield
);
1732 ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name
1733 << " failed" << dendl
;
1737 auto stack_guard
= make_scope_guard(
1740 worker
->workpool
->drain();
1744 if (bucket
->get_marker() != bucket_marker
) {
1745 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
1746 << bucket_tenant
<< ":" << bucket_name
1747 << " cur_marker=" << bucket
->get_marker()
1748 << " orig_marker=" << bucket_marker
<< dendl
;
1752 map
<string
, bufferlist
>::iterator aiter
1753 = bucket
->get_attrs().find(RGW_ATTR_LC
);
1754 if (aiter
== bucket
->get_attrs().end()) {
1755 ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for "
1756 << bucket_name
<< " (terminates bucket_lc_process(...))"
1761 bufferlist::const_iterator iter
{&aiter
->second
};
1763 config
.decode(iter
);
1764 } catch (const buffer::error
& e
) {
1765 ldpp_dout(this, 0) << __func__
<< "() decode life cycle config failed"
1770 auto pf
= [](RGWLC::LCWorker
* wk
, WorkQ
* wq
, WorkItem
& wi
) {
1772 boost::get
<std::tuple
<LCOpRule
, rgw_bucket_dir_entry
>>(wi
);
1773 auto& [op_rule
, o
] = wt
;
1775 ldpp_dout(wk
->get_lc(), 20)
1776 << __func__
<< "(): key=" << o
.key
<< wq
->thr_name()
1778 int ret
= op_rule
.process(o
, wk
->dpp
, wq
);
1780 ldpp_dout(wk
->get_lc(), 20)
1781 << "ERROR: orule.process() returned ret=" << ret
1782 << "thread:" << wq
->thr_name()
1786 worker
->workpool
->setf(pf
);
1788 multimap
<string
, lc_op
>& prefix_map
= config
.get_prefix_map();
1789 ldpp_dout(this, 10) << __func__
<< "() prefix_map size="
1790 << prefix_map
.size()
1793 rgw_obj_key pre_marker
;
1794 rgw_obj_key next_marker
;
1795 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end();
1798 if (worker_should_stop(stop_at
, once
)) {
1799 ldpp_dout(this, 5) << __func__
<< " interval budget EXPIRED worker "
1805 auto& op
= prefix_iter
->second
;
1806 if (!is_valid_op(op
)) {
1809 ldpp_dout(this, 20) << __func__
<< "(): prefix=" << prefix_iter
->first
1811 if (prefix_iter
!= prefix_map
.begin() &&
1812 (prefix_iter
->first
.compare(0, prev(prefix_iter
)->first
.length(),
1813 prev(prefix_iter
)->first
) == 0)) {
1814 next_marker
= pre_marker
;
1816 pre_marker
= next_marker
;
1819 LCObjsLister
ol(store
, bucket
.get());
1820 ol
.set_prefix(prefix_iter
->first
);
1822 ret
= ol
.init(this);
1824 if (ret
== (-ENOENT
))
1826 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl
;
1830 op_env
oenv(op
, store
, worker
, bucket
.get(), ol
);
1831 LCOpRule
orule(oenv
);
1832 orule
.build(); // why can't ctor do it?
1833 rgw_bucket_dir_entry
* o
{nullptr};
1834 for (; ol
.get_obj(this, &o
/* , fetch_barrier */); ol
.next()) {
1836 std::tuple
<LCOpRule
, rgw_bucket_dir_entry
> t1
= {orule
, *o
};
1837 worker
->workpool
->enqueue(WorkItem
{t1
});
1839 worker
->workpool
->drain();
1842 ret
= handle_multipart_expiration(bucket
.get(), prefix_map
, worker
, stop_at
, once
);
1846 int RGWLC::bucket_lc_post(int index
, int max_lock_sec
,
1847 rgw::sal::Lifecycle::LCEntry
& entry
, int& result
,
1850 utime_t
lock_duration(cct
->_conf
->rgw_lc_lock_max_time
, 0);
1852 rgw::sal::LCSerializer
* lock
= sal_lc
->get_serializer(lc_index_lock_name
,
1856 ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry
1857 << " index: " << index
<< " worker ix: " << worker
->ix
1861 int ret
= lock
->try_lock(this, lock_duration
, null_yield
);
1862 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
1863 /* already locked by another lc processor */
1864 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1865 << obj_names
[index
] << ", sleep 5, try again " << dendl
;
1872 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names
[index
]
1874 if (result
== -ENOENT
) {
1875 /* XXXX are we SURE the only way result could == ENOENT is when
1876 * there is no such bucket? It is currently the value returned
1877 * from bucket_lc_process(...) */
1878 ret
= sal_lc
->rm_entry(obj_names
[index
], entry
);
1880 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1881 << obj_names
[index
] << dendl
;
1884 } else if (result
< 0) {
1885 entry
.status
= lc_failed
;
1887 entry
.status
= lc_complete
;
1890 ret
= sal_lc
->set_entry(obj_names
[index
], entry
);
1892 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1893 << obj_names
[index
] << dendl
;
1898 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
1899 << obj_names
[index
] << dendl
;
1904 int RGWLC::list_lc_progress(string
& marker
, uint32_t max_entries
,
1905 vector
<rgw::sal::Lifecycle::LCEntry
>& progress_map
,
1908 progress_map
.clear();
1909 for(; index
< max_objs
; index
++, marker
="") {
1910 vector
<rgw::sal::Lifecycle::LCEntry
> entries
;
1911 int ret
= sal_lc
->list_entries(obj_names
[index
], marker
, max_entries
, entries
);
1913 if (ret
== -ENOENT
) {
1914 ldpp_dout(this, 10) << __func__
<< "() ignoring unfound lc object="
1915 << obj_names
[index
] << dendl
;
1921 progress_map
.reserve(progress_map
.size() + entries
.size());
1922 progress_map
.insert(progress_map
.end(), entries
.begin(), entries
.end());
1924 /* update index, marker tuple */
1925 if (progress_map
.size() > 0)
1926 marker
= progress_map
.back().bucket
;
1928 if (progress_map
.size() >= max_entries
)
1934 static inline vector
<int> random_sequence(uint32_t n
)
1936 vector
<int> v(n
, 0);
1937 std::generate(v
.begin(), v
.end(),
1938 [ix
= 0]() mutable {
1941 std::random_device rd
;
1942 std::default_random_engine rng
{rd()};
1943 std::shuffle(v
.begin(), v
.end(), rd
);
1947 static inline int get_lc_index(CephContext
*cct
,
1948 const std::string
& shard_id
)
1951 (cct
->_conf
->rgw_lc_max_objs
> HASH_PRIME
? HASH_PRIME
:
1952 cct
->_conf
->rgw_lc_max_objs
);
1953 /* n.b. review hash algo */
1954 int index
= ceph_str_hash_linux(shard_id
.c_str(),
1955 shard_id
.size()) % HASH_PRIME
% max_objs
;
1959 static inline void get_lc_oid(CephContext
*cct
,
1960 const std::string
& shard_id
, string
*oid
)
1962 /* n.b. review hash algo */
1963 int index
= get_lc_index(cct
, shard_id
);
1964 *oid
= lc_oid_prefix
;
1966 snprintf(buf
, 32, ".%d", index
);
1971 static std::string
get_lc_shard_name(const rgw_bucket
& bucket
){
1972 return string_join_reserve(':', bucket
.tenant
, bucket
.name
, bucket
.marker
);
1975 int RGWLC::process(LCWorker
* worker
,
1976 const std::unique_ptr
<rgw::sal::Bucket
>& optional_bucket
,
1980 int max_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
1982 if (optional_bucket
) {
1983 /* if a bucket is provided, this is a single-bucket run, and
1984 * can be processed without traversing any state entries (we
1985 * do need the entry {pro,epi}logue which update the state entry
1986 * for this bucket) */
1987 auto bucket_entry_marker
= get_lc_shard_name(optional_bucket
->get_key());
1988 auto index
= get_lc_index(store
->ctx(), bucket_entry_marker
);
1989 ret
= process_bucket(index
, max_secs
, worker
, bucket_entry_marker
, once
);
1992 /* generate an index-shard sequence unrelated to any other
1993 * that might be running in parallel */
1994 std::string all_buckets
{""};
1995 vector
<int> shard_seq
= random_sequence(max_objs
);
1996 for (auto index
: shard_seq
) {
1997 ret
= process(index
, max_secs
, worker
, once
);
2006 bool RGWLC::expired_session(time_t started
)
2008 time_t interval
= (cct
->_conf
->rgw_lc_debug_interval
> 0)
2009 ? cct
->_conf
->rgw_lc_debug_interval
2012 auto now
= time(nullptr);
2014 ldpp_dout(this, 16) << "RGWLC::expired_session"
2015 << " started: " << started
2016 << " interval: " << interval
<< "(*2==" << 2*interval
<< ")"
2020 return (started
+ 2*interval
< now
);
2023 time_t RGWLC::thread_stop_at()
2025 uint64_t interval
= (cct
->_conf
->rgw_lc_debug_interval
> 0)
2026 ? cct
->_conf
->rgw_lc_debug_interval
2029 return time(nullptr) + interval
;
2032 int RGWLC::process_bucket(int index
, int max_lock_secs
, LCWorker
* worker
,
2033 const std::string
& bucket_entry_marker
,
2036 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
2037 << "index: " << index
<< " worker ix: " << worker
->ix
2041 std::unique_ptr
<rgw::sal::LCSerializer
> serializer(
2042 sal_lc
->get_serializer(lc_index_lock_name
, obj_names
[index
],
2045 rgw::sal::Lifecycle::LCEntry entry
;
2046 if (max_lock_secs
<= 0) {
2050 utime_t
time(max_lock_secs
, 0);
2051 ret
= serializer
->try_lock(this, time
, null_yield
);
2052 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
2053 /* already locked by another lc processor */
2054 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
2055 << obj_names
[index
] << dendl
;
2061 std::unique_lock
<rgw::sal::LCSerializer
> lock(
2062 *(serializer
.get()), std::adopt_lock
);
2064 ret
= sal_lc
->get_entry(obj_names
[index
], bucket_entry_marker
, entry
);
2066 if (entry
.status
== lc_processing
) {
2067 if (expired_session(entry
.start_time
)) {
2068 ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
2069 << " index: " << index
<< " worker ix: " << worker
->ix
2073 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
2075 << " index: " << index
2076 << " worker ix: " << worker
->ix
2083 /* do nothing if no bucket */
2084 if (entry
.bucket
.empty()) {
2088 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
2089 << " index: " << index
<< " worker ix: " << worker
->ix
2092 entry
.status
= lc_processing
;
2093 ret
= sal_lc
->set_entry(obj_names
[index
], entry
);
2095 ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
2096 << obj_names
[index
] << entry
.bucket
<< entry
.status
2101 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
2102 << " index: " << index
<< " worker ix: " << worker
->ix
2106 ret
= bucket_lc_process(entry
.bucket
, worker
, thread_stop_at(), once
);
2107 bucket_lc_post(index
, max_lock_secs
, entry
, ret
, worker
);
2110 } /* RGWLC::process_bucket */
2112 int RGWLC::process(int index
, int max_lock_secs
, LCWorker
* worker
,
2115 ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
2116 << "index: " << index
<< " worker ix: " << worker
->ix
2120 rgw::sal::LCSerializer
* lock
= sal_lc
->get_serializer(lc_index_lock_name
,
2124 utime_t now
= ceph_clock_now();
2125 //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
2126 rgw::sal::Lifecycle::LCEntry entry
;
2127 if (max_lock_secs
<= 0)
2130 utime_t
time(max_lock_secs
, 0);
2131 ret
= lock
->try_lock(this, time
, null_yield
);
2132 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
2133 /* already locked by another lc processor */
2134 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
2135 << obj_names
[index
] << ", sleep 5, try again" << dendl
;
2137 continue; // XXXX really retry forever?
2142 rgw::sal::Lifecycle::LCHead head
;
2143 ret
= sal_lc
->get_head(obj_names
[index
], head
);
2145 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
2146 << obj_names
[index
] << ", ret=" << ret
<< dendl
;
2150 ret
= sal_lc
->get_entry(obj_names
[index
], head
.marker
, entry
);
2152 if (entry
.status
== lc_processing
) {
2153 if (expired_session(entry
.start_time
)) {
2154 ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry
2155 << " index: " << index
<< " worker ix: " << worker
->ix
2159 ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry
2160 << " index: " << index
<< " worker ix: " << worker
->ix
2167 if(!if_already_run_today(head
.start_date
) ||
2169 head
.start_date
= now
;
2170 head
.marker
.clear();
2171 ret
= bucket_lc_prepare(index
, worker
);
2173 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
2181 ret
= sal_lc
->get_next_entry(obj_names
[index
], head
.marker
, entry
);
2183 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
2184 << obj_names
[index
] << dendl
;
2188 /* termination condition (eof) */
2189 if (entry
.bucket
.empty())
2192 ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
2193 << " index: " << index
<< " worker ix: " << worker
->ix
2196 entry
.status
= lc_processing
;
2197 ret
= sal_lc
->set_entry(obj_names
[index
], entry
);
2199 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
2200 << obj_names
[index
] << entry
.bucket
<< entry
.status
<< dendl
;
2204 head
.marker
= entry
.bucket
;
2205 ret
= sal_lc
->put_head(obj_names
[index
], head
);
2207 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2213 ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
2214 << " index: " << index
<< " worker ix: " << worker
->ix
2218 ret
= bucket_lc_process(entry
.bucket
, worker
, thread_stop_at(), once
);
2219 bucket_lc_post(index
, max_lock_secs
, entry
, ret
, worker
);
2220 } while(1 && !once
);
2231 void RGWLC::start_processor()
2233 auto maxw
= cct
->_conf
->rgw_lc_max_worker
;
2234 workers
.reserve(maxw
);
2235 for (int ix
= 0; ix
< maxw
; ++ix
) {
2237 std::make_unique
<RGWLC::LCWorker
>(this /* dpp */, cct
, this, ix
);
2238 worker
->create((string
{"lifecycle_thr_"} + to_string(ix
)).c_str());
2239 workers
.emplace_back(std::move(worker
));
2243 void RGWLC::stop_processor()
2246 for (auto& worker
: workers
) {
2253 unsigned RGWLC::get_subsys() const
2258 std::ostream
& RGWLC::gen_prefix(std::ostream
& out
) const
2260 return out
<< "lifecycle: ";
2263 void RGWLC::LCWorker::stop()
2265 std::lock_guard l
{lock
};
2269 bool RGWLC::going_down()
2274 bool RGWLC::LCWorker::should_work(utime_t
& now
)
2280 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
2281 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
,
2282 &end_hour
, &end_minute
);
2284 time_t tt
= now
.sec();
2285 localtime_r(&tt
, &bdt
);
2287 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
2288 /* We're debugging, so say we can run */
2290 } else if ((bdt
.tm_hour
*60 + bdt
.tm_min
>= start_hour
*60 + start_minute
) &&
2291 (bdt
.tm_hour
*60 + bdt
.tm_min
<= end_hour
*60 + end_minute
)) {
2299 int RGWLC::LCWorker::schedule_next_start_time(utime_t
&start
, utime_t
& now
)
2303 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
2304 secs
= start
+ cct
->_conf
->rgw_lc_debug_interval
- now
;
2314 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
2315 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
,
2318 time_t tt
= now
.sec();
2320 localtime_r(&tt
, &bdt
);
2321 bdt
.tm_hour
= start_hour
;
2322 bdt
.tm_min
= start_minute
;
2327 return secs
>0 ? secs
: secs
+24*60*60;
2330 RGWLC::LCWorker::~LCWorker()
2335 void RGWLifecycleConfiguration::generate_test_instances(
2336 list
<RGWLifecycleConfiguration
*>& o
)
2338 o
.push_back(new RGWLifecycleConfiguration
);
2341 template<typename F
>
2342 static int guard_lc_modify(const DoutPrefixProvider
*dpp
,
2343 rgw::sal::Store
* store
,
2344 rgw::sal::Lifecycle
* sal_lc
,
2345 const rgw_bucket
& bucket
, const string
& cookie
,
2347 CephContext
*cct
= store
->ctx();
2349 string shard_id
= get_lc_shard_name(bucket
);
2352 get_lc_oid(cct
, shard_id
, &oid
);
2354 /* XXX it makes sense to take shard_id for a bucket_id? */
2355 rgw::sal::Lifecycle::LCEntry entry
;
2356 entry
.bucket
= shard_id
;
2357 entry
.status
= lc_uninitial
;
2358 int max_lock_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
2360 rgw::sal::LCSerializer
* lock
= sal_lc
->get_serializer(lc_index_lock_name
,
2363 utime_t
time(max_lock_secs
, 0);
2368 ret
= lock
->try_lock(dpp
, time
, null_yield
);
2369 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
2370 ldpp_dout(dpp
, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2371 << oid
<< ", sleep 5, try again" << dendl
;
2372 sleep(5); // XXX: return retryable error
2376 ldpp_dout(dpp
, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2377 << oid
<< ", ret=" << ret
<< dendl
;
2380 ret
= f(sal_lc
, oid
, entry
);
2382 ldpp_dout(dpp
, 0) << "RGWLC::RGWPutLC() failed to set entry on "
2383 << oid
<< ", ret=" << ret
<< dendl
;
2392 int RGWLC::set_bucket_config(rgw::sal::Bucket
* bucket
,
2393 const rgw::sal::Attrs
& bucket_attrs
,
2394 RGWLifecycleConfiguration
*config
)
2396 rgw::sal::Attrs attrs
= bucket_attrs
;
2398 config
->encode(lc_bl
);
2400 attrs
[RGW_ATTR_LC
] = std::move(lc_bl
);
2403 bucket
->merge_and_store_attrs(this, attrs
, null_yield
);
2407 rgw_bucket
& b
= bucket
->get_key();
2410 ret
= guard_lc_modify(this, store
, sal_lc
.get(), b
, cookie
,
2411 [&](rgw::sal::Lifecycle
* sal_lc
, const string
& oid
,
2412 const rgw::sal::Lifecycle::LCEntry
& entry
) {
2413 return sal_lc
->set_entry(oid
, entry
);
2419 int RGWLC::remove_bucket_config(rgw::sal::Bucket
* bucket
,
2420 const rgw::sal::Attrs
& bucket_attrs
)
2422 rgw::sal::Attrs attrs
= bucket_attrs
;
2423 attrs
.erase(RGW_ATTR_LC
);
2424 int ret
= bucket
->merge_and_store_attrs(this, attrs
, null_yield
);
2426 rgw_bucket
& b
= bucket
->get_key();
2429 ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
2430 << b
.name
<< " returned err=" << ret
<< dendl
;
2435 ret
= guard_lc_modify(this, store
, sal_lc
.get(), b
, cookie
,
2436 [&](rgw::sal::Lifecycle
* sal_lc
, const string
& oid
,
2437 const rgw::sal::Lifecycle::LCEntry
& entry
) {
2438 return sal_lc
->rm_entry(oid
, entry
);
2442 } /* RGWLC::remove_bucket_config */
2452 int fix_lc_shard_entry(const DoutPrefixProvider
*dpp
,
2453 rgw::sal::Store
* store
,
2454 rgw::sal::Lifecycle
* sal_lc
,
2455 rgw::sal::Bucket
* bucket
)
2457 if (auto aiter
= bucket
->get_attrs().find(RGW_ATTR_LC
);
2458 aiter
== bucket
->get_attrs().end()) {
2459 return 0; // No entry, nothing to fix
2462 auto shard_name
= get_lc_shard_name(bucket
->get_key());
2464 get_lc_oid(store
->ctx(), shard_name
, &lc_oid
);
2466 rgw::sal::Lifecycle::LCEntry entry
;
2467 // There are multiple cases we need to encounter here
2468 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
2469 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
2470 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
2471 // We are not dropping the old marker here as that would be caught by the next LC process update
2472 int ret
= sal_lc
->get_entry(lc_oid
, shard_name
, entry
);
2474 ldpp_dout(dpp
, 5) << "Entry already exists, nothing to do" << dendl
;
2475 return ret
; // entry is already existing correctly set to marker
2477 ldpp_dout(dpp
, 5) << "lc_get_entry errored ret code=" << ret
<< dendl
;
2478 if (ret
== -ENOENT
) {
2479 ldpp_dout(dpp
, 1) << "No entry for bucket=" << bucket
2480 << " creating " << dendl
;
2481 // TODO: we have too many ppl making cookies like this!
2482 char cookie_buf
[COOKIE_LEN
+ 1];
2483 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
2484 std::string cookie
= cookie_buf
;
2486 ret
= guard_lc_modify(dpp
,
2487 store
, sal_lc
, bucket
->get_key(), cookie
,
2488 [&lc_oid
](rgw::sal::Lifecycle
* slc
,
2490 const rgw::sal::Lifecycle::LCEntry
& entry
) {
2491 return slc
->set_entry(lc_oid
, entry
);
2499 std::string
s3_expiration_header(
2500 DoutPrefixProvider
* dpp
,
2501 const rgw_obj_key
& obj_key
,
2502 const RGWObjTags
& obj_tagset
,
2503 const ceph::real_time
& mtime
,
2504 const std::map
<std::string
, buffer::list
>& bucket_attrs
)
2506 CephContext
* cct
= dpp
->get_cct();
2507 RGWLifecycleConfiguration
config(cct
);
2508 std::string hdr
{""};
2510 const auto& aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
2511 if (aiter
== bucket_attrs
.end())
2514 bufferlist::const_iterator iter
{&aiter
->second
};
2516 config
.decode(iter
);
2517 } catch (const buffer::error
& e
) {
2518 ldpp_dout(dpp
, 0) << __func__
2519 << "() decode life cycle config failed"
2524 /* dump tags at debug level 16 */
2525 RGWObjTags::tag_map_t obj_tag_map
= obj_tagset
.get_tags();
2526 if (cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 16)) {
2527 for (const auto& elt
: obj_tag_map
) {
2528 ldpp_dout(dpp
, 16) << __func__
2529 << "() key=" << elt
.first
<< " val=" << elt
.second
2534 boost::optional
<ceph::real_time
> expiration_date
;
2535 boost::optional
<std::string
> rule_id
;
2537 const auto& rule_map
= config
.get_rule_map();
2538 for (const auto& ri
: rule_map
) {
2539 const auto& rule
= ri
.second
;
2540 auto& id
= rule
.get_id();
2541 auto& filter
= rule
.get_filter();
2542 auto& prefix
= filter
.has_prefix() ? filter
.get_prefix(): rule
.get_prefix();
2543 auto& expiration
= rule
.get_expiration();
2544 auto& noncur_expiration
= rule
.get_noncur_expiration();
2546 ldpp_dout(dpp
, 10) << "rule: " << ri
.first
2547 << " prefix: " << prefix
2549 << " date: " << expiration
.get_date()
2550 << " days: " << expiration
.get_days()
2551 << " noncur_expiration: "
2552 << " date: " << noncur_expiration
.get_date()
2553 << " days: " << noncur_expiration
.get_days()
2556 /* skip if rule !enabled
2557 * if rule has prefix, skip iff object !match prefix
2558 * if rule has tags, skip iff object !match tags
2559 * note if object is current or non-current, compare accordingly
2560 * if rule has days, construct date expression and save iff older
2562 * if rule has date, convert date expression and save iff older
2564 * if the date accum has a value, format it into hdr
2567 if (! rule
.is_enabled())
2570 if(! prefix
.empty()) {
2571 if (! boost::starts_with(obj_key
.name
, prefix
))
2575 if (filter
.has_tags()) {
2576 bool tag_match
= false;
2577 const RGWObjTags
& rule_tagset
= filter
.get_tags();
2578 for (auto& tag
: rule_tagset
.get_tags()) {
2579 /* remember, S3 tags are {key,value} tuples */
2581 auto obj_tag
= obj_tag_map
.find(tag
.first
);
2582 if (obj_tag
== obj_tag_map
.end() || obj_tag
->second
!= tag
.second
) {
2583 ldpp_dout(dpp
, 10) << "tag does not match obj_key=" << obj_key
2584 << " rule_id=" << id
2595 // compute a uniform expiration date
2596 boost::optional
<ceph::real_time
> rule_expiration_date
;
2597 const LCExpiration
& rule_expiration
=
2598 (obj_key
.instance
.empty()) ? expiration
: noncur_expiration
;
2600 if (rule_expiration
.has_date()) {
2601 rule_expiration_date
=
2602 boost::optional
<ceph::real_time
>(
2603 ceph::from_iso_8601(rule
.get_expiration().get_date()));
2605 if (rule_expiration
.has_days()) {
2606 rule_expiration_date
=
2607 boost::optional
<ceph::real_time
>(
2608 mtime
+ make_timespan(double(rule_expiration
.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime
)%(24*60*60) + 24*60*60));
2612 // update earliest expiration
2613 if (rule_expiration_date
) {
2614 if ((! expiration_date
) ||
2615 (*expiration_date
> *rule_expiration_date
)) {
2617 boost::optional
<ceph::real_time
>(rule_expiration_date
);
2618 rule_id
= boost::optional
<std::string
>(id
);
2623 // cond format header
2624 if (expiration_date
&& rule_id
) {
2625 // Fri, 23 Dec 2012 00:00:00 GMT
2627 time_t exp
= ceph::real_clock::to_time_t(*expiration_date
);
2628 if (std::strftime(exp_buf
, sizeof(exp_buf
),
2629 "%a, %d %b %Y %T %Z", std::gmtime(&exp
))) {
2630 hdr
= fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf
,
2633 ldpp_dout(dpp
, 0) << __func__
<<
2634 "() strftime of life cycle expiration header failed"
2641 } /* rgwlc_s3_expiration_header */
2643 bool s3_multipart_abort_header(
2644 DoutPrefixProvider
* dpp
,
2645 const rgw_obj_key
& obj_key
,
2646 const ceph::real_time
& mtime
,
2647 const std::map
<std::string
, buffer::list
>& bucket_attrs
,
2648 ceph::real_time
& abort_date
,
2649 std::string
& rule_id
)
2651 CephContext
* cct
= dpp
->get_cct();
2652 RGWLifecycleConfiguration
config(cct
);
2654 const auto& aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
2655 if (aiter
== bucket_attrs
.end())
2658 bufferlist::const_iterator iter
{&aiter
->second
};
2660 config
.decode(iter
);
2661 } catch (const buffer::error
& e
) {
2662 ldpp_dout(dpp
, 0) << __func__
2663 << "() decode life cycle config failed"
2668 std::optional
<ceph::real_time
> abort_date_tmp
;
2669 std::optional
<std::string_view
> rule_id_tmp
;
2670 const auto& rule_map
= config
.get_rule_map();
2671 for (const auto& ri
: rule_map
) {
2672 const auto& rule
= ri
.second
;
2673 const auto& id
= rule
.get_id();
2674 const auto& filter
= rule
.get_filter();
2675 const auto& prefix
= filter
.has_prefix()?filter
.get_prefix():rule
.get_prefix();
2676 const auto& mp_expiration
= rule
.get_mp_expiration();
2677 if (!rule
.is_enabled()) {
2680 if(!prefix
.empty() && !boost::starts_with(obj_key
.name
, prefix
)) {
2684 std::optional
<ceph::real_time
> rule_abort_date
;
2685 if (mp_expiration
.has_days()) {
2686 rule_abort_date
= std::optional
<ceph::real_time
>(
2687 mtime
+ make_timespan(mp_expiration
.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime
)%(24*60*60) + 24*60*60));
2690 // update earliest abort date
2691 if (rule_abort_date
) {
2692 if ((! abort_date_tmp
) ||
2693 (*abort_date_tmp
> *rule_abort_date
)) {
2695 std::optional
<ceph::real_time
>(rule_abort_date
);
2696 rule_id_tmp
= std::optional
<std::string_view
>(id
);
2700 if (abort_date_tmp
&& rule_id_tmp
) {
2701 abort_date
= *abort_date_tmp
;
2702 rule_id
= *rule_id_tmp
;
2709 } /* namespace rgw::lc */
2711 void lc_op::dump(Formatter
*f
) const
2713 f
->dump_bool("status", status
);
2714 f
->dump_bool("dm_expiration", dm_expiration
);
2716 f
->dump_int("expiration", expiration
);
2717 f
->dump_int("noncur_expiration", noncur_expiration
);
2718 f
->dump_int("mp_expiration", mp_expiration
);
2719 if (expiration_date
) {
2720 utime_t
ut(*expiration_date
);
2721 f
->dump_stream("expiration_date") << ut
;
2724 f
->dump_object("obj_tags", *obj_tags
);
2726 f
->open_object_section("transitions");
2727 for(auto& [storage_class
, transition
] : transitions
) {
2728 f
->dump_object(storage_class
, transition
);
2732 f
->open_object_section("noncur_transitions");
2733 for (auto& [storage_class
, transition
] : noncur_transitions
) {
2734 f
->dump_object(storage_class
, transition
);
2739 void LCFilter::dump(Formatter
*f
) const
2741 f
->dump_string("prefix", prefix
);
2742 f
->dump_object("obj_tags", obj_tags
);
2745 void LCExpiration::dump(Formatter
*f
) const
2747 f
->dump_string("days", days
);
2748 f
->dump_string("date", date
);
2751 void LCRule::dump(Formatter
*f
) const
2753 f
->dump_string("id", id
);
2754 f
->dump_string("prefix", prefix
);
2755 f
->dump_string("status", status
);
2756 f
->dump_object("expiration", expiration
);
2757 f
->dump_object("noncur_expiration", noncur_expiration
);
2758 f
->dump_object("mp_expiration", mp_expiration
);
2759 f
->dump_object("filter", filter
);
2760 f
->open_object_section("transitions");
2761 for (auto& [storage_class
, transition
] : transitions
) {
2762 f
->dump_object(storage_class
, transition
);
2766 f
->open_object_section("noncur_transitions");
2767 for (auto& [storage_class
, transition
] : noncur_transitions
) {
2768 f
->dump_object(storage_class
, transition
);
2771 f
->dump_bool("dm_expiration", dm_expiration
);
2775 void RGWLifecycleConfiguration::dump(Formatter
*f
) const
2777 f
->open_object_section("prefix_map");
2778 for (auto& prefix
: prefix_map
) {
2779 f
->dump_object(prefix
.first
.c_str(), prefix
.second
);
2783 f
->open_array_section("rule_map");
2784 for (auto& rule
: rule_map
) {
2785 f
->open_object_section("entry");
2786 f
->dump_string("id", rule
.first
);
2787 f
->open_object_section("rule");
2788 rule
.second
.dump(f
);