]>
Commit | Line | Data |
---|---|---|
1adf2230 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
1adf2230 | 3 | |
7c673cae FG |
4 | #include <string.h> |
5 | #include <iostream> | |
6 | #include <map> | |
e306af50 TL |
7 | #include <algorithm> |
8 | #include <tuple> | |
9 | #include <functional> | |
7c673cae FG |
10 | |
11 | #include <boost/algorithm/string/split.hpp> | |
12 | #include <boost/algorithm/string.hpp> | |
9f95a23c | 13 | #include <boost/algorithm/string/predicate.hpp> |
e306af50 | 14 | #include <boost/variant.hpp> |
7c673cae | 15 | |
f6b5b4d7 | 16 | #include "include/scope_guard.h" |
7c673cae | 17 | #include "common/Formatter.h" |
e306af50 | 18 | #include "common/containers.h" |
7c673cae | 19 | #include <common/errno.h> |
11fdf7f2 | 20 | #include "include/random.h" |
7c673cae FG |
21 | #include "cls/rgw/cls_rgw_client.h" |
22 | #include "cls/lock/cls_lock_client.h" | |
e306af50 | 23 | #include "rgw_perf_counters.h" |
7c673cae FG |
24 | #include "rgw_common.h" |
25 | #include "rgw_bucket.h" | |
26 | #include "rgw_lc.h" | |
eafe8130 | 27 | #include "rgw_zone.h" |
11fdf7f2 | 28 | #include "rgw_string.h" |
9f95a23c | 29 | #include "rgw_multi.h" |
f6b5b4d7 | 30 | #include "rgw_sal.h" |
9f95a23c TL |
31 | |
32 | // this seems safe to use, at least for now--arguably, we should | |
33 | // prefer header-only fmt, in general | |
34 | #undef FMT_HEADER_ONLY | |
35 | #define FMT_HEADER_ONLY 1 | |
36 | #include "fmt/format.h" | |
11fdf7f2 TL |
37 | |
38 | #include "services/svc_sys_obj.h" | |
9f95a23c TL |
39 | #include "services/svc_zone.h" |
40 | #include "services/svc_tier_rados.h" | |
7c673cae FG |
41 | |
42 | #define dout_context g_ceph_context | |
43 | #define dout_subsys ceph_subsys_rgw | |
44 | ||
45 | const char* LC_STATUS[] = { | |
46 | "UNINITIAL", | |
47 | "PROCESSING", | |
48 | "FAILED", | |
49 | "COMPLETE" | |
50 | }; | |
51 | ||
7c673cae FG |
52 | using namespace librados; |
53 | ||
11fdf7f2 | 54 | bool LCRule::valid() const |
7c673cae FG |
55 | { |
56 | if (id.length() > MAX_ID_LEN) { | |
57 | return false; | |
58 | } | |
e306af50 TL |
59 | else if(expiration.empty() && noncur_expiration.empty() && |
60 | mp_expiration.empty() && !dm_expiration && | |
11fdf7f2 | 61 | transitions.empty() && noncur_transitions.empty()) { |
7c673cae FG |
62 | return false; |
63 | } | |
e306af50 TL |
64 | else if (!expiration.valid() || !noncur_expiration.valid() || |
65 | !mp_expiration.valid()) { | |
7c673cae FG |
66 | return false; |
67 | } | |
11fdf7f2 TL |
68 | if (!transitions.empty()) { |
69 | bool using_days = expiration.has_days(); | |
70 | bool using_date = expiration.has_date(); | |
71 | for (const auto& elem : transitions) { | |
72 | if (!elem.second.valid()) { | |
73 | return false; | |
74 | } | |
75 | using_days = using_days || elem.second.has_days(); | |
76 | using_date = using_date || elem.second.has_date(); | |
77 | if (using_days && using_date) { | |
78 | return false; | |
79 | } | |
80 | } | |
81 | } | |
82 | for (const auto& elem : noncur_transitions) { | |
83 | if (!elem.second.valid()) { | |
84 | return false; | |
85 | } | |
86 | } | |
87 | ||
7c673cae FG |
88 | return true; |
89 | } | |
90 | ||
e306af50 TL |
91 | void LCRule::init_simple_days_rule(std::string_view _id, |
92 | std::string_view _prefix, int num_days) | |
11fdf7f2 TL |
93 | { |
94 | id = _id; | |
95 | prefix = _prefix; | |
96 | char buf[32]; | |
97 | snprintf(buf, sizeof(buf), "%d", num_days); | |
98 | expiration.set_days(buf); | |
99 | set_enabled(true); | |
100 | } | |
101 | ||
102 | void RGWLifecycleConfiguration::add_rule(const LCRule& rule) | |
7c673cae | 103 | { |
11fdf7f2 TL |
104 | auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups |
105 | rule_map.insert(pair<string, LCRule>(id, rule)); | |
7c673cae FG |
106 | } |
107 | ||
11fdf7f2 | 108 | bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule) |
7c673cae | 109 | { |
494da23a | 110 | lc_op op(rule.get_id()); |
11fdf7f2 TL |
111 | op.status = rule.is_enabled(); |
112 | if (rule.get_expiration().has_days()) { | |
113 | op.expiration = rule.get_expiration().get_days(); | |
7c673cae | 114 | } |
11fdf7f2 TL |
115 | if (rule.get_expiration().has_date()) { |
116 | op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date()); | |
7c673cae | 117 | } |
11fdf7f2 TL |
118 | if (rule.get_noncur_expiration().has_days()) { |
119 | op.noncur_expiration = rule.get_noncur_expiration().get_days(); | |
224ce89b | 120 | } |
11fdf7f2 TL |
121 | if (rule.get_mp_expiration().has_days()) { |
122 | op.mp_expiration = rule.get_mp_expiration().get_days(); | |
7c673cae | 123 | } |
11fdf7f2 TL |
124 | op.dm_expiration = rule.get_dm_expiration(); |
125 | for (const auto &elem : rule.get_transitions()) { | |
126 | transition_action action; | |
127 | if (elem.second.has_days()) { | |
128 | action.days = elem.second.get_days(); | |
129 | } else { | |
130 | action.date = ceph::from_iso_8601(elem.second.get_date()); | |
131 | } | |
e306af50 TL |
132 | action.storage_class |
133 | = rgw_placement_rule::get_canonical_storage_class(elem.first); | |
11fdf7f2 TL |
134 | op.transitions.emplace(elem.first, std::move(action)); |
135 | } | |
136 | for (const auto &elem : rule.get_noncur_transitions()) { | |
137 | transition_action action; | |
138 | action.days = elem.second.get_days(); | |
139 | action.date = ceph::from_iso_8601(elem.second.get_date()); | |
140 | action.storage_class = elem.first; | |
141 | op.noncur_transitions.emplace(elem.first, std::move(action)); | |
7c673cae | 142 | } |
181888fb | 143 | std::string prefix; |
11fdf7f2 TL |
144 | if (rule.get_filter().has_prefix()){ |
145 | prefix = rule.get_filter().get_prefix(); | |
181888fb | 146 | } else { |
11fdf7f2 TL |
147 | prefix = rule.get_prefix(); |
148 | } | |
149 | ||
150 | if (rule.get_filter().has_tags()){ | |
151 | op.obj_tags = rule.get_filter().get_tags(); | |
181888fb | 152 | } |
494da23a TL |
153 | prefix_map.emplace(std::move(prefix), std::move(op)); |
154 | return true; | |
7c673cae FG |
155 | } |
156 | ||
11fdf7f2 | 157 | int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule) |
7c673cae | 158 | { |
11fdf7f2 | 159 | if (!rule.valid()) { |
7c673cae FG |
160 | return -EINVAL; |
161 | } | |
11fdf7f2 | 162 | auto& id = rule.get_id(); |
7c673cae FG |
163 | if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same |
164 | return -EINVAL; | |
165 | } | |
e306af50 TL |
166 | if (rule.get_filter().has_tags() && (rule.get_dm_expiration() || |
167 | !rule.get_mp_expiration().empty())) { | |
9f95a23c TL |
168 | return -ERR_INVALID_REQUEST; |
169 | } | |
11fdf7f2 | 170 | rule_map.insert(pair<string, LCRule>(id, rule)); |
7c673cae FG |
171 | |
172 | if (!_add_rule(rule)) { | |
173 | return -ERR_INVALID_REQUEST; | |
174 | } | |
175 | return 0; | |
176 | } | |
177 | ||
e306af50 TL |
178 | bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, |
179 | const lc_op& second) { | |
224ce89b WB |
180 | if ((first.expiration > 0 || first.expiration_date != boost::none) && |
181 | (second.expiration > 0 || second.expiration_date != boost::none)) { | |
182 | return true; | |
183 | } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) { | |
184 | return true; | |
185 | } else if (first.mp_expiration > 0 && second.mp_expiration > 0) { | |
186 | return true; | |
11fdf7f2 TL |
187 | } else if (!first.transitions.empty() && !second.transitions.empty()) { |
188 | for (auto &elem : first.transitions) { | |
189 | if (second.transitions.find(elem.first) != second.transitions.end()) { | |
190 | return true; | |
191 | } | |
192 | } | |
e306af50 TL |
193 | } else if (!first.noncur_transitions.empty() && |
194 | !second.noncur_transitions.empty()) { | |
11fdf7f2 | 195 | for (auto &elem : first.noncur_transitions) { |
e306af50 TL |
196 | if (second.noncur_transitions.find(elem.first) != |
197 | second.noncur_transitions.end()) { | |
11fdf7f2 TL |
198 | return true; |
199 | } | |
200 | } | |
224ce89b | 201 | } |
11fdf7f2 | 202 | return false; |
224ce89b WB |
203 | } |
204 | ||
494da23a TL |
205 | /* Formerly, this method checked for duplicate rules using an invalid |
206 | * method (prefix uniqueness). */ | |
224ce89b | 207 | bool RGWLifecycleConfiguration::valid() |
7c673cae | 208 | { |
7c673cae FG |
209 | return true; |
210 | } | |
211 | ||
212 | void *RGWLC::LCWorker::entry() { | |
213 | do { | |
214 | utime_t start = ceph_clock_now(); | |
215 | if (should_work(start)) { | |
11fdf7f2 | 216 | ldpp_dout(dpp, 2) << "life cycle: start" << dendl; |
f6b5b4d7 | 217 | int r = lc->process(this, false /* once */); |
7c673cae | 218 | if (r < 0) { |
f6b5b4d7 TL |
219 | ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" |
220 | << r << dendl; | |
7c673cae | 221 | } |
11fdf7f2 | 222 | ldpp_dout(dpp, 2) << "life cycle: stop" << dendl; |
7c673cae FG |
223 | } |
224 | if (lc->going_down()) | |
225 | break; | |
226 | ||
227 | utime_t end = ceph_clock_now(); | |
228 | int secs = schedule_next_start_time(start, end); | |
c07f9fc5 FG |
229 | utime_t next; |
230 | next.set_from_double(end + secs); | |
231 | ||
f6b5b4d7 TL |
232 | ldpp_dout(dpp, 5) << "schedule life cycle next start time: " |
233 | << rgw_to_asctime(next) << dendl; | |
7c673cae | 234 | |
9f95a23c TL |
235 | std::unique_lock l{lock}; |
236 | cond.wait_for(l, std::chrono::seconds(secs)); | |
7c673cae FG |
237 | } while (!lc->going_down()); |
238 | ||
239 | return NULL; | |
240 | } | |
241 | ||
9f95a23c | 242 | void RGWLC::initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store) { |
7c673cae FG |
243 | cct = _cct; |
244 | store = _store; | |
245 | max_objs = cct->_conf->rgw_lc_max_objs; | |
246 | if (max_objs > HASH_PRIME) | |
247 | max_objs = HASH_PRIME; | |
248 | ||
249 | obj_names = new string[max_objs]; | |
250 | ||
251 | for (int i = 0; i < max_objs; i++) { | |
252 | obj_names[i] = lc_oid_prefix; | |
253 | char buf[32]; | |
254 | snprintf(buf, 32, ".%d", i); | |
255 | obj_names[i].append(buf); | |
256 | } | |
257 | ||
258 | #define COOKIE_LEN 16 | |
259 | char cookie_buf[COOKIE_LEN + 1]; | |
260 | gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1); | |
261 | cookie = cookie_buf; | |
262 | } | |
263 | ||
264 | void RGWLC::finalize() | |
265 | { | |
266 | delete[] obj_names; | |
267 | } | |
268 | ||
f6b5b4d7 | 269 | bool RGWLC::if_already_run_today(time_t start_date) |
7c673cae FG |
270 | { |
271 | struct tm bdt; | |
272 | time_t begin_of_day; | |
273 | utime_t now = ceph_clock_now(); | |
274 | localtime_r(&start_date, &bdt); | |
275 | ||
276 | if (cct->_conf->rgw_lc_debug_interval > 0) { | |
3efd9988 FG |
277 | if (now - start_date < cct->_conf->rgw_lc_debug_interval) |
278 | return true; | |
279 | else | |
280 | return false; | |
7c673cae FG |
281 | } |
282 | ||
283 | bdt.tm_hour = 0; | |
284 | bdt.tm_min = 0; | |
285 | bdt.tm_sec = 0; | |
286 | begin_of_day = mktime(&bdt); | |
287 | if (now - begin_of_day < 24*60*60) | |
288 | return true; | |
289 | else | |
290 | return false; | |
291 | } | |
292 | ||
f6b5b4d7 TL |
293 | static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) { |
294 | os << "<ent: bucket="; | |
295 | os << ent.bucket; | |
296 | os << "; start_time="; | |
297 | os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0)); | |
298 | os << "; status="; | |
299 | os << ent.status; | |
300 | os << ">"; | |
301 | return os; | |
302 | } | |
303 | ||
e306af50 | 304 | int RGWLC::bucket_lc_prepare(int index, LCWorker* worker) |
7c673cae | 305 | { |
f6b5b4d7 | 306 | vector<cls_rgw_lc_entry> entries; |
7c673cae FG |
307 | string marker; |
308 | ||
f6b5b4d7 TL |
309 | dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE " |
310 | << "index: " << index << " worker ix: " << worker->ix | |
311 | << dendl; | |
312 | ||
7c673cae FG |
313 | #define MAX_LC_LIST_ENTRIES 100 |
314 | do { | |
e306af50 TL |
315 | int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], |
316 | marker, MAX_LC_LIST_ENTRIES, entries); | |
7c673cae FG |
317 | if (ret < 0) |
318 | return ret; | |
f6b5b4d7 TL |
319 | |
320 | for (auto& entry : entries) { | |
321 | entry.start_time = ceph_clock_now(); | |
322 | entry.status = lc_uninitial; // lc_uninitial? really? | |
e306af50 | 323 | ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, |
f6b5b4d7 | 324 | obj_names[index], entry); |
7c673cae | 325 | if (ret < 0) { |
f6b5b4d7 TL |
326 | ldpp_dout(this, 0) |
327 | << "RGWLC::bucket_lc_prepare() failed to set entry on " | |
328 | << obj_names[index] << dendl; | |
11fdf7f2 | 329 | return ret; |
7c673cae | 330 | } |
11fdf7f2 TL |
331 | } |
332 | ||
f6b5b4d7 TL |
333 | if (! entries.empty()) { |
334 | marker = std::move(entries.back().bucket); | |
7c673cae FG |
335 | } |
336 | } while (!entries.empty()); | |
337 | ||
338 | return 0; | |
339 | } | |
340 | ||
e306af50 TL |
341 | static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, |
342 | ceph::real_time *expire_time = nullptr) | |
7c673cae | 343 | { |
11fdf7f2 TL |
344 | double timediff, cmp; |
345 | utime_t base_time; | |
31f18b77 FG |
346 | if (cct->_conf->rgw_lc_debug_interval <= 0) { |
347 | /* Normal case, run properly */ | |
348 | cmp = days*24*60*60; | |
11fdf7f2 | 349 | base_time = ceph_clock_now().round_to_day(); |
31f18b77 FG |
350 | } else { |
351 | /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */ | |
352 | cmp = days*cct->_conf->rgw_lc_debug_interval; | |
11fdf7f2 | 353 | base_time = ceph_clock_now(); |
31f18b77 | 354 | } |
f6b5b4d7 TL |
355 | auto tt_mtime = ceph::real_clock::to_time_t(mtime); |
356 | timediff = base_time - tt_mtime; | |
7c673cae | 357 | |
11fdf7f2 TL |
358 | if (expire_time) { |
359 | *expire_time = mtime + make_timespan(cmp); | |
7c673cae | 360 | } |
f6b5b4d7 TL |
361 | |
362 | ldout(cct, 20) << __func__ << __func__ | |
363 | << "(): mtime=" << mtime << " days=" << days | |
364 | << " base_time=" << base_time << " timediff=" << timediff | |
365 | << " cmp=" << cmp | |
366 | << " is_expired=" << (timediff >= cmp) | |
367 | << dendl; | |
11fdf7f2 TL |
368 | |
369 | return (timediff >= cmp); | |
7c673cae FG |
370 | } |
371 | ||
e306af50 TL |
372 | static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, |
373 | rgw_obj& obj, RGWObjectCtx& ctx) | |
eafe8130 TL |
374 | { |
375 | if (!bucket_info.obj_lock_enabled()) { | |
376 | return true; | |
377 | } | |
378 | RGWRados::Object op_target(store, bucket_info, ctx, obj); | |
379 | RGWRados::Object::Read read_op(&op_target); | |
380 | map<string, bufferlist> attrs; | |
381 | read_op.params.attrs = &attrs; | |
9f95a23c | 382 | int ret = read_op.prepare(null_yield); |
eafe8130 TL |
383 | if (ret < 0) { |
384 | if (ret == -ENOENT) { | |
385 | return true; | |
386 | } else { | |
387 | return false; | |
388 | } | |
389 | } else { | |
390 | auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); | |
391 | if (iter != attrs.end()) { | |
392 | RGWObjectRetention retention; | |
393 | try { | |
394 | decode(retention, iter->second); | |
395 | } catch (buffer::error& err) { | |
f6b5b4d7 TL |
396 | ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" |
397 | << dendl; | |
eafe8130 TL |
398 | return false; |
399 | } | |
e306af50 TL |
400 | if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > |
401 | ceph_clock_now()) { | |
eafe8130 TL |
402 | return false; |
403 | } | |
404 | } | |
405 | iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD); | |
406 | if (iter != attrs.end()) { | |
407 | RGWObjectLegalHold obj_legal_hold; | |
408 | try { | |
409 | decode(obj_legal_hold, iter->second); | |
410 | } catch (buffer::error& err) { | |
f6b5b4d7 TL |
411 | ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" |
412 | << dendl; | |
eafe8130 TL |
413 | return false; |
414 | } | |
415 | if (obj_legal_hold.is_enabled()) { | |
416 | return false; | |
417 | } | |
418 | } | |
419 | return true; | |
420 | } | |
421 | } | |
422 | ||
11fdf7f2 | 423 | class LCObjsLister { |
9f95a23c | 424 | rgw::sal::RGWRadosStore *store; |
11fdf7f2 TL |
425 | RGWBucketInfo& bucket_info; |
426 | RGWRados::Bucket target; | |
427 | RGWRados::Bucket::List list_op; | |
428 | bool is_truncated{false}; | |
429 | rgw_obj_key next_marker; | |
430 | string prefix; | |
431 | vector<rgw_bucket_dir_entry> objs; | |
432 | vector<rgw_bucket_dir_entry>::iterator obj_iter; | |
433 | rgw_bucket_dir_entry pre_obj; | |
434 | int64_t delay_ms; | |
435 | ||
436 | public: | |
9f95a23c | 437 | LCObjsLister(rgw::sal::RGWRadosStore *_store, RGWBucketInfo& _bucket_info) : |
11fdf7f2 | 438 | store(_store), bucket_info(_bucket_info), |
9f95a23c | 439 | target(store->getRados(), bucket_info), list_op(&target) { |
11fdf7f2 TL |
440 | list_op.params.list_versions = bucket_info.versioned(); |
441 | list_op.params.allow_unordered = true; | |
442 | delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay"); | |
443 | } | |
444 | ||
445 | void set_prefix(const string& p) { | |
446 | prefix = p; | |
447 | list_op.params.prefix = prefix; | |
448 | } | |
449 | ||
450 | int init() { | |
451 | return fetch(); | |
452 | } | |
453 | ||
454 | int fetch() { | |
e306af50 TL |
455 | int ret = list_op.list_objects( |
456 | 1000, &objs, NULL, &is_truncated, null_yield); | |
11fdf7f2 TL |
457 | if (ret < 0) { |
458 | return ret; | |
459 | } | |
460 | ||
461 | obj_iter = objs.begin(); | |
462 | ||
463 | return 0; | |
464 | } | |
465 | ||
466 | void delay() { | |
467 | std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); | |
468 | } | |
469 | ||
e306af50 TL |
470 | bool get_obj(rgw_bucket_dir_entry **obj, |
471 | std::function<void(void)> fetch_barrier | |
472 | = []() { /* nada */}) { | |
11fdf7f2 | 473 | if (obj_iter == objs.end()) { |
9f95a23c TL |
474 | if (!is_truncated) { |
475 | delay(); | |
476 | return false; | |
11fdf7f2 | 477 | } else { |
e306af50 | 478 | fetch_barrier(); |
9f95a23c | 479 | list_op.params.marker = pre_obj.key; |
9f95a23c TL |
480 | int ret = fetch(); |
481 | if (ret < 0) { | |
e306af50 TL |
482 | ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret |
483 | << dendl; | |
9f95a23c TL |
484 | return ret; |
485 | } | |
11fdf7f2 TL |
486 | } |
487 | delay(); | |
488 | } | |
e306af50 TL |
489 | /* returning address of entry in objs */ |
490 | *obj = &(*obj_iter); | |
9f95a23c | 491 | return obj_iter != objs.end(); |
11fdf7f2 TL |
492 | } |
493 | ||
494 | rgw_bucket_dir_entry get_prev_obj() { | |
495 | return pre_obj; | |
496 | } | |
497 | ||
498 | void next() { | |
499 | pre_obj = *obj_iter; | |
500 | ++obj_iter; | |
501 | } | |
502 | ||
f6b5b4d7 TL |
503 | boost::optional<std::string> next_key_name() { |
504 | if (obj_iter == objs.end() || | |
505 | (obj_iter + 1) == objs.end()) { | |
11fdf7f2 TL |
506 | /* this should have been called after get_obj() was called, so this should |
507 | * only happen if is_truncated is false */ | |
f6b5b4d7 | 508 | return boost::none; |
11fdf7f2 | 509 | } |
f6b5b4d7 TL |
510 | |
511 | return ((obj_iter + 1)->key.name); | |
11fdf7f2 | 512 | } |
f6b5b4d7 | 513 | |
e306af50 | 514 | }; /* LCObjsLister */ |
11fdf7f2 TL |
515 | |
516 | struct op_env { | |
e306af50 TL |
517 | |
518 | using LCWorker = RGWLC::LCWorker; | |
519 | ||
f6b5b4d7 | 520 | lc_op op; |
9f95a23c | 521 | rgw::sal::RGWRadosStore *store; |
e306af50 | 522 | LCWorker* worker; |
11fdf7f2 TL |
523 | RGWBucketInfo& bucket_info; |
524 | LCObjsLister& ol; | |
525 | ||
e306af50 TL |
526 | op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker, |
527 | RGWBucketInfo& _bucket_info, LCObjsLister& _ol) | |
528 | : op(_op), store(_store), worker(_worker), bucket_info(_bucket_info), | |
529 | ol(_ol) {} | |
530 | }; /* op_env */ | |
11fdf7f2 TL |
531 | |
532 | class LCRuleOp; | |
f6b5b4d7 | 533 | class WorkQ; |
11fdf7f2 TL |
534 | |
535 | struct lc_op_ctx { | |
536 | CephContext *cct; | |
f6b5b4d7 TL |
537 | op_env env; |
538 | rgw_bucket_dir_entry o; | |
539 | boost::optional<std::string> next_key_name; | |
540 | ceph::real_time effective_mtime; | |
11fdf7f2 | 541 | |
9f95a23c | 542 | rgw::sal::RGWRadosStore *store; |
11fdf7f2 | 543 | RGWBucketInfo& bucket_info; |
f6b5b4d7 | 544 | lc_op& op; // ok--refers to expanded env.op |
11fdf7f2 TL |
545 | LCObjsLister& ol; |
546 | ||
547 | rgw_obj obj; | |
548 | RGWObjectCtx rctx; | |
9f95a23c | 549 | const DoutPrefixProvider *dpp; |
f6b5b4d7 TL |
550 | WorkQ* wq; |
551 | ||
552 | lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, | |
553 | boost::optional<std::string> next_key_name, | |
554 | ceph::real_time effective_mtime, | |
555 | const DoutPrefixProvider *dpp, WorkQ* wq) | |
556 | : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name), | |
557 | effective_mtime(effective_mtime), | |
e306af50 | 558 | store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol), |
f6b5b4d7 TL |
559 | obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq) |
560 | {} | |
561 | ||
562 | bool next_has_same_name(const std::string& key_name) { | |
563 | return (next_key_name && key_name.compare( | |
564 | boost::get<std::string>(next_key_name)) == 0); | |
565 | } | |
566 | ||
e306af50 | 567 | }; /* lc_op_ctx */ |
11fdf7f2 TL |
568 | |
569 | static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) | |
570 | { | |
571 | auto& store = oc.store; | |
572 | auto& bucket_info = oc.bucket_info; | |
573 | auto& o = oc.o; | |
574 | auto obj_key = o.key; | |
575 | auto& meta = o.meta; | |
576 | ||
577 | if (!remove_indeed) { | |
578 | obj_key.instance.clear(); | |
579 | } else if (obj_key.instance.empty()) { | |
580 | obj_key.instance = "null"; | |
581 | } | |
582 | ||
583 | rgw_obj obj(bucket_info.bucket, obj_key); | |
584 | ACLOwner obj_owner; | |
585 | obj_owner.set_id(rgw_user {meta.owner}); | |
586 | obj_owner.set_name(meta.owner_display_name); | |
587 | ||
9f95a23c | 588 | RGWRados::Object del_target(store->getRados(), bucket_info, oc.rctx, obj); |
11fdf7f2 TL |
589 | RGWRados::Object::Delete del_op(&del_target); |
590 | ||
591 | del_op.params.bucket_owner = bucket_info.owner; | |
592 | del_op.params.versioning_status = bucket_info.versioning_status(); | |
593 | del_op.params.obj_owner = obj_owner; | |
594 | del_op.params.unmod_since = meta.mtime; | |
595 | ||
9f95a23c | 596 | return del_op.delete_obj(null_yield); |
e306af50 | 597 | } /* remove_expired_obj */ |
11fdf7f2 TL |
598 | |
599 | class LCOpAction { | |
600 | public: | |
601 | virtual ~LCOpAction() {} | |
602 | ||
603 | virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) { | |
604 | return false; | |
f6b5b4d7 | 605 | } |
11fdf7f2 TL |
606 | |
607 | /* called after check(). Check should tell us whether this action | |
608 | * is applicable. If there are multiple actions, we'll end up executing | |
609 | * the latest applicable action | |
610 | * For example: | |
611 | * one action after 10 days, another after 20, third after 40. | |
612 | * After 10 days, the latest applicable action would be the first one, | |
613 | * after 20 days it will be the second one. After 21 days it will still be the | |
614 | * second one. So check() should return true for the second action at that point, | |
615 | * but should_process() if the action has already been applied. In object removal | |
616 | * it doesn't matter, but in object transition it does. | |
617 | */ | |
618 | virtual bool should_process() { | |
619 | return true; | |
620 | } | |
621 | ||
622 | virtual int process(lc_op_ctx& oc) { | |
623 | return 0; | |
624 | } | |
f6b5b4d7 TL |
625 | |
626 | friend class LCOpRule; | |
e306af50 | 627 | }; /* LCOpAction */ |
11fdf7f2 TL |
628 | |
629 | class LCOpFilter { | |
630 | public: | |
631 | virtual ~LCOpFilter() {} | |
632 | virtual bool check(lc_op_ctx& oc) { | |
633 | return false; | |
634 | } | |
e306af50 | 635 | }; /* LCOpFilter */ |
11fdf7f2 TL |
636 | |
637 | class LCOpRule { | |
638 | friend class LCOpAction; | |
639 | ||
f6b5b4d7 TL |
640 | op_env env; |
641 | boost::optional<std::string> next_key_name; | |
642 | ceph::real_time effective_mtime; | |
11fdf7f2 | 643 | |
f6b5b4d7 TL |
644 | std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd |
645 | std::vector<shared_ptr<LCOpAction> > actions; | |
11fdf7f2 TL |
646 | |
647 | public: | |
648 | LCOpRule(op_env& _env) : env(_env) {} | |
649 | ||
f6b5b4d7 TL |
650 | boost::optional<std::string> get_next_key_name() { |
651 | return next_key_name; | |
652 | } | |
653 | ||
654 | std::vector<shared_ptr<LCOpAction>>& get_actions() { | |
655 | return actions; | |
656 | } | |
657 | ||
11fdf7f2 | 658 | void build(); |
f6b5b4d7 TL |
659 | void update(); |
660 | int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, | |
661 | WorkQ* wq); | |
e306af50 TL |
662 | }; /* LCOpRule */ |
663 | ||
664 | using WorkItem = | |
665 | boost::variant<void*, | |
666 | /* out-of-line delete */ | |
f6b5b4d7 | 667 | std::tuple<LCOpRule, rgw_bucket_dir_entry>, |
e306af50 | 668 | /* uncompleted MPU expiration */ |
f6b5b4d7 | 669 | std::tuple<lc_op, rgw_bucket_dir_entry>, |
e306af50 TL |
670 | rgw_bucket_dir_entry>; |
671 | ||
672 | class WorkQ : public Thread | |
673 | { | |
674 | public: | |
675 | using unique_lock = std::unique_lock<std::mutex>; | |
f6b5b4d7 | 676 | using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>; |
e306af50 TL |
677 | using dequeue_result = boost::variant<void*, WorkItem>; |
678 | ||
f6b5b4d7 TL |
679 | static constexpr uint32_t FLAG_NONE = 0x0000; |
680 | static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; | |
681 | static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; | |
682 | static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; | |
683 | ||
e306af50 | 684 | private: |
f6b5b4d7 | 685 | const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; |
e306af50 TL |
686 | RGWLC::LCWorker* wk; |
687 | uint32_t qmax; | |
f6b5b4d7 | 688 | int ix; |
e306af50 TL |
689 | std::mutex mtx; |
690 | std::condition_variable cv; | |
f6b5b4d7 | 691 | uint32_t flags; |
e306af50 TL |
692 | vector<WorkItem> items; |
693 | work_f f; | |
694 | ||
695 | public: | |
696 | WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) | |
f6b5b4d7 | 697 | : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) |
e306af50 | 698 | { |
f6b5b4d7 | 699 | create(thr_name().c_str()); |
e306af50 TL |
700 | } |
701 | ||
f6b5b4d7 TL |
702 | std::string thr_name() { |
703 | return std::string{"wp_thrd: "} | |
704 | + std::to_string(wk->ix) + ", " + std::to_string(ix); | |
705 | } | |
706 | ||
e306af50 TL |
707 | void setf(work_f _f) { |
708 | f = _f; | |
709 | } | |
710 | ||
711 | void enqueue(WorkItem&& item) { | |
712 | unique_lock uniq(mtx); | |
713 | while ((!wk->get_lc()->going_down()) && | |
714 | (items.size() > qmax)) { | |
f6b5b4d7 | 715 | flags |= FLAG_EWAIT_SYNC; |
e306af50 TL |
716 | cv.wait_for(uniq, 200ms); |
717 | } | |
718 | items.push_back(item); | |
f6b5b4d7 TL |
719 | if (flags & FLAG_DWAIT_SYNC) { |
720 | flags &= ~FLAG_DWAIT_SYNC; | |
721 | cv.notify_one(); | |
722 | } | |
e306af50 TL |
723 | } |
724 | ||
725 | void drain() { | |
726 | unique_lock uniq(mtx); | |
f6b5b4d7 TL |
727 | flags |= FLAG_EDRAIN_SYNC; |
728 | while (flags & FLAG_EDRAIN_SYNC) { | |
e306af50 TL |
729 | cv.wait_for(uniq, 200ms); |
730 | } | |
731 | } | |
732 | ||
733 | private: | |
734 | dequeue_result dequeue() { | |
735 | unique_lock uniq(mtx); | |
736 | while ((!wk->get_lc()->going_down()) && | |
737 | (items.size() == 0)) { | |
f6b5b4d7 TL |
738 | /* clear drain state, as we are NOT doing work and qlen==0 */ |
739 | if (flags & FLAG_EDRAIN_SYNC) { | |
740 | flags &= ~FLAG_EDRAIN_SYNC; | |
741 | } | |
742 | flags |= FLAG_DWAIT_SYNC; | |
e306af50 TL |
743 | cv.wait_for(uniq, 200ms); |
744 | } | |
745 | if (items.size() > 0) { | |
746 | auto item = items.back(); | |
747 | items.pop_back(); | |
f6b5b4d7 TL |
748 | if (flags & FLAG_EWAIT_SYNC) { |
749 | flags &= ~FLAG_EWAIT_SYNC; | |
750 | cv.notify_one(); | |
751 | } | |
e306af50 TL |
752 | return {item}; |
753 | } | |
754 | return nullptr; | |
755 | } | |
756 | ||
757 | void* entry() override { | |
758 | while (!wk->get_lc()->going_down()) { | |
759 | auto item = dequeue(); | |
760 | if (item.which() == 0) { | |
761 | /* going down */ | |
762 | break; | |
763 | } | |
f6b5b4d7 | 764 | f(wk, this, boost::get<WorkItem>(item)); |
e306af50 TL |
765 | } |
766 | return nullptr; | |
767 | } | |
768 | }; /* WorkQ */ | |
769 | ||
770 | class RGWLC::WorkPool | |
771 | { | |
772 | using TVector = ceph::containers::tiny_vector<WorkQ, 3>; | |
773 | TVector wqs; | |
774 | uint64_t ix; | |
775 | ||
776 | public: | |
777 | WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax) | |
778 | : wqs(TVector{ | |
779 | n_threads, | |
780 | [&](const size_t ix, auto emplacer) { | |
781 | emplacer.emplace(wk, ix, qmax); | |
782 | }}), | |
783 | ix(0) | |
784 | {} | |
785 | ||
f6b5b4d7 TL |
786 | ~WorkPool() { |
787 | for (auto& wq : wqs) { | |
788 | wq.join(); | |
789 | } | |
790 | } | |
791 | ||
e306af50 TL |
792 | void setf(WorkQ::work_f _f) { |
793 | for (auto& wq : wqs) { | |
794 | wq.setf(_f); | |
795 | } | |
796 | } | |
797 | ||
798 | void enqueue(WorkItem item) { | |
799 | const auto tix = ix; | |
800 | ix = (ix+1) % wqs.size(); | |
801 | (wqs[tix]).enqueue(std::move(item)); | |
802 | } | |
803 | ||
804 | void drain() { | |
805 | for (auto& wq : wqs) { | |
806 | wq.drain(); | |
807 | } | |
808 | } | |
809 | }; /* WorkPool */ | |
810 | ||
f6b5b4d7 TL |
811 | RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct, |
812 | RGWLC *lc, int ix) | |
813 | : dpp(dpp), cct(cct), lc(lc), ix(ix) | |
e306af50 TL |
814 | { |
815 | auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker"); | |
816 | workpool = new WorkPool(this, wpw, 512); | |
817 | } | |
818 | ||
f6b5b4d7 TL |
819 | static inline bool worker_should_stop(time_t stop_at, bool once) |
820 | { | |
821 | return !once && stop_at < time(nullptr); | |
822 | } | |
823 | ||
e306af50 TL |
824 | int RGWLC::handle_multipart_expiration( |
825 | RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map, | |
f6b5b4d7 | 826 | LCWorker* worker, time_t stop_at, bool once) |
e306af50 TL |
827 | { |
828 | MultipartMetaFilter mp_filter; | |
829 | vector<rgw_bucket_dir_entry> objs; | |
830 | bool is_truncated; | |
831 | int ret; | |
832 | RGWBucketInfo& bucket_info = target->get_bucket_info(); | |
833 | RGWRados::Bucket::List list_op(target); | |
834 | auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay"); | |
835 | list_op.params.list_versions = false; | |
836 | /* lifecycle processing does not depend on total order, so can | |
837 | * take advantage of unordered listing optimizations--such as | |
838 | * operating on one shard at a time */ | |
839 | list_op.params.allow_unordered = true; | |
840 | list_op.params.ns = RGW_OBJ_NS_MULTIPART; | |
841 | list_op.params.filter = &mp_filter; | |
842 | ||
f6b5b4d7 TL |
843 | auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { |
844 | auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi); | |
e306af50 TL |
845 | auto& [rule, obj] = wt; |
846 | RGWMPObj mp_obj; | |
847 | if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) { | |
848 | rgw_obj_key key(obj.key); | |
849 | if (!mp_obj.from_meta(key.name)) { | |
850 | return; | |
851 | } | |
852 | RGWObjectCtx rctx(store); | |
f6b5b4d7 TL |
853 | int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj); |
854 | if (ret == 0) { | |
855 | if (perfcounter) { | |
856 | perfcounter->inc(l_rgw_lc_abort_mpu, 1); | |
857 | } | |
858 | } else { | |
859 | if (ret == -ERR_NO_SUCH_UPLOAD) { | |
860 | ldpp_dout(wk->get_lc(), 5) | |
861 | << "ERROR: abort_multipart_upload failed, ret=" << ret | |
862 | << wq->thr_name() | |
863 | << ", meta:" << obj.key | |
864 | << dendl; | |
865 | } else { | |
866 | ldpp_dout(wk->get_lc(), 0) | |
867 | << "ERROR: abort_multipart_upload failed, ret=" << ret | |
868 | << wq->thr_name() | |
869 | << ", meta:" << obj.key | |
870 | << dendl; | |
871 | } | |
872 | } /* abort failed */ | |
e306af50 TL |
873 | } /* expired */ |
874 | }; | |
875 | ||
876 | worker->workpool->setf(pf); | |
877 | ||
878 | for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); | |
879 | ++prefix_iter) { | |
f6b5b4d7 TL |
880 | |
881 | if (worker_should_stop(stop_at, once)) { | |
882 | ldout(cct, 5) << __func__ << " interval budget EXPIRED worker " | |
883 | << worker->ix | |
884 | << dendl; | |
885 | return 0; | |
886 | } | |
887 | ||
e306af50 TL |
888 | if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { |
889 | continue; | |
890 | } | |
891 | list_op.params.prefix = prefix_iter->first; | |
892 | do { | |
893 | objs.clear(); | |
894 | list_op.params.marker = list_op.get_next_marker(); | |
895 | ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield); | |
896 | if (ret < 0) { | |
897 | if (ret == (-ENOENT)) | |
898 | return 0; | |
899 | ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl; | |
900 | return ret; | |
901 | } | |
902 | ||
903 | for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) { | |
f6b5b4d7 | 904 | std::tuple<lc_op, rgw_bucket_dir_entry> t1 = |
e306af50 TL |
905 | {prefix_iter->second, *obj_iter}; |
906 | worker->workpool->enqueue(WorkItem{t1}); | |
907 | if (going_down()) { | |
e306af50 TL |
908 | return 0; |
909 | } | |
910 | } /* for objs */ | |
911 | ||
912 | std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); | |
913 | } while(is_truncated); | |
914 | } /* for prefix_map */ | |
915 | ||
916 | worker->workpool->drain(); | |
917 | return 0; | |
918 | } | |
919 | ||
920 | static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, | |
921 | rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl) | |
922 | { | |
923 | RGWRados::Object op_target(store, bucket_info, ctx, obj); | |
924 | RGWRados::Object::Read read_op(&op_target); | |
925 | ||
926 | return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield); | |
927 | } | |
928 | ||
929 | static bool is_valid_op(const lc_op& op) | |
930 | { | |
931 | return (op.status && | |
932 | (op.expiration > 0 | |
933 | || op.expiration_date != boost::none | |
934 | || op.noncur_expiration > 0 | |
935 | || op.dm_expiration | |
936 | || !op.transitions.empty() | |
937 | || !op.noncur_transitions.empty())); | |
938 | } | |
939 | ||
940 | static inline bool has_all_tags(const lc_op& rule_action, | |
941 | const RGWObjTags& object_tags) | |
942 | { | |
943 | if(! rule_action.obj_tags) | |
944 | return false; | |
945 | if(object_tags.count() < rule_action.obj_tags->count()) | |
946 | return false; | |
947 | size_t tag_count = 0; | |
948 | for (const auto& tag : object_tags.get_tags()) { | |
949 | const auto& rule_tags = rule_action.obj_tags->get_tags(); | |
950 | const auto& iter = rule_tags.find(tag.first); | |
f6b5b4d7 TL |
951 | if(iter == rule_tags.end()) |
952 | continue; | |
e306af50 TL |
953 | if(iter->second == tag.second) |
954 | { | |
955 | tag_count++; | |
956 | } | |
957 | /* all tags in the rule appear in obj tags */ | |
958 | } | |
959 | return tag_count == rule_action.obj_tags->count(); | |
960 | } | |
11fdf7f2 TL |
961 | |
962 | static int check_tags(lc_op_ctx& oc, bool *skip) | |
963 | { | |
964 | auto& op = oc.op; | |
965 | ||
966 | if (op.obj_tags != boost::none) { | |
967 | *skip = true; | |
968 | ||
969 | bufferlist tags_bl; | |
e306af50 TL |
970 | int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj, |
971 | oc.rctx, tags_bl); | |
11fdf7f2 TL |
972 | if (ret < 0) { |
973 | if (ret != -ENODATA) { | |
f6b5b4d7 TL |
974 | ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" |
975 | << ret << " " << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
976 | } |
977 | return 0; | |
978 | } | |
979 | RGWObjTags dest_obj_tags; | |
980 | try { | |
981 | auto iter = tags_bl.cbegin(); | |
982 | dest_obj_tags.decode(iter); | |
983 | } catch (buffer::error& err) { | |
f6b5b4d7 TL |
984 | ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet " |
985 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
986 | return -EIO; |
987 | } | |
988 | ||
494da23a | 989 | if (! has_all_tags(op, dest_obj_tags)) { |
f6b5b4d7 TL |
990 | ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj |
991 | << " as tags do not match in rule: " | |
992 | << op.id << " " | |
993 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
994 | return 0; |
995 | } | |
996 | } | |
997 | *skip = false; | |
998 | return 0; | |
999 | } | |
1000 | ||
1001 | class LCOpFilter_Tags : public LCOpFilter { | |
1002 | public: | |
1003 | bool check(lc_op_ctx& oc) override { | |
1004 | auto& o = oc.o; | |
1005 | ||
1006 | if (o.is_delete_marker()) { | |
1007 | return true; | |
1008 | } | |
1009 | ||
1010 | bool skip; | |
1011 | ||
1012 | int ret = check_tags(oc, &skip); | |
1013 | if (ret < 0) { | |
1014 | if (ret == -ENOENT) { | |
1015 | return false; | |
1016 | } | |
f6b5b4d7 TL |
1017 | ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj |
1018 | << " returned ret=" << ret << " " | |
1019 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1020 | return false; |
1021 | } | |
1022 | ||
1023 | return !skip; | |
1024 | }; | |
1025 | }; | |
1026 | ||
1027 | class LCOpAction_CurrentExpiration : public LCOpAction { | |
1028 | public: | |
f6b5b4d7 TL |
1029 | LCOpAction_CurrentExpiration(op_env& env) {} |
1030 | ||
11fdf7f2 TL |
1031 | bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { |
1032 | auto& o = oc.o; | |
1033 | if (!o.is_current()) { | |
f6b5b4d7 TL |
1034 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key |
1035 | << ": not current, skipping " | |
1036 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1037 | return false; |
1038 | } | |
eafe8130 | 1039 | if (o.is_delete_marker()) { |
f6b5b4d7 TL |
1040 | std::string nkn; |
1041 | if (oc.next_key_name) nkn = *oc.next_key_name; | |
1042 | if (oc.next_has_same_name(o.key.name)) { | |
1043 | ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key | |
1044 | << " next_key_name: %%" << nkn << "%% " | |
1045 | << oc.wq->thr_name() << dendl; | |
1046 | return false; | |
eafe8130 | 1047 | } else { |
f6b5b4d7 TL |
1048 | ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key |
1049 | << " next_key_name: %%" << nkn << "%% " | |
1050 | << oc.wq->thr_name() << dendl; | |
eafe8130 TL |
1051 | *exp_time = real_clock::now(); |
1052 | return true; | |
1053 | } | |
1054 | } | |
11fdf7f2 TL |
1055 | |
1056 | auto& mtime = o.meta.mtime; | |
1057 | bool is_expired; | |
1058 | auto& op = oc.op; | |
1059 | if (op.expiration <= 0) { | |
1060 | if (op.expiration_date == boost::none) { | |
f6b5b4d7 TL |
1061 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key |
1062 | << ": no expiration set in rule, skipping " | |
1063 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1064 | return false; |
1065 | } | |
e306af50 TL |
1066 | is_expired = ceph_clock_now() >= |
1067 | ceph::real_clock::to_time_t(*op.expiration_date); | |
11fdf7f2 TL |
1068 | *exp_time = *op.expiration_date; |
1069 | } else { | |
1070 | is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time); | |
1071 | } | |
1072 | ||
f6b5b4d7 TL |
1073 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" |
1074 | << (int)is_expired << " " | |
1075 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1076 | return is_expired; |
1077 | } | |
1078 | ||
1079 | int process(lc_op_ctx& oc) { | |
1080 | auto& o = oc.o; | |
eafe8130 TL |
1081 | int r; |
1082 | if (o.is_delete_marker()) { | |
1083 | r = remove_expired_obj(oc, true); | |
f6b5b4d7 TL |
1084 | if (r < 0) { |
1085 | ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj " | |
1086 | << oc.bucket_info.bucket << ":" << o.key | |
1087 | << " " << cpp_strerror(r) << " " | |
1088 | << oc.wq->thr_name() << dendl; | |
1089 | return r; | |
1090 | } | |
1091 | ldout(oc.cct, 2) << "DELETED: current is-dm " | |
1092 | << oc.bucket_info.bucket << ":" << o.key | |
1093 | << " " << oc.wq->thr_name() << dendl; | |
eafe8130 | 1094 | } else { |
f6b5b4d7 | 1095 | /* ! o.is_delete_marker() */ |
eafe8130 | 1096 | r = remove_expired_obj(oc, !oc.bucket_info.versioned()); |
f6b5b4d7 TL |
1097 | if (r < 0) { |
1098 | ldout(oc.cct, 0) << "ERROR: remove_expired_obj " | |
1099 | << oc.bucket_info.bucket << ":" << o.key | |
1100 | << " " << cpp_strerror(r) << " " | |
1101 | << oc.wq->thr_name() << dendl; | |
1102 | return r; | |
1103 | } | |
1104 | if (perfcounter) { | |
1105 | perfcounter->inc(l_rgw_lc_expire_current, 1); | |
1106 | } | |
1107 | ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key | |
1108 | << " " << oc.wq->thr_name() << dendl; | |
eafe8130 | 1109 | } |
11fdf7f2 TL |
1110 | return 0; |
1111 | } | |
1112 | }; | |
1113 | ||
1114 | class LCOpAction_NonCurrentExpiration : public LCOpAction { | |
f6b5b4d7 | 1115 | protected: |
11fdf7f2 | 1116 | public: |
f6b5b4d7 TL |
1117 | LCOpAction_NonCurrentExpiration(op_env& env) |
1118 | {} | |
1119 | ||
11fdf7f2 TL |
1120 | bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { |
1121 | auto& o = oc.o; | |
1122 | if (o.is_current()) { | |
f6b5b4d7 TL |
1123 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key |
1124 | << ": current version, skipping " | |
1125 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1126 | return false; |
1127 | } | |
1128 | ||
11fdf7f2 | 1129 | int expiration = oc.op.noncur_expiration; |
f6b5b4d7 TL |
1130 | bool is_expired = obj_has_expired(oc.cct, oc.effective_mtime, expiration, |
1131 | exp_time); | |
1132 | ||
1133 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" | |
1134 | << is_expired << " " | |
1135 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 | 1136 | |
e306af50 TL |
1137 | return is_expired && |
1138 | pass_object_lock_check(oc.store->getRados(), | |
1139 | oc.bucket_info, oc.obj, oc.rctx); | |
11fdf7f2 TL |
1140 | } |
1141 | ||
1142 | int process(lc_op_ctx& oc) { | |
1143 | auto& o = oc.o; | |
1144 | int r = remove_expired_obj(oc, true); | |
1145 | if (r < 0) { | |
9f95a23c | 1146 | ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) " |
f6b5b4d7 TL |
1147 | << oc.bucket_info.bucket << ":" << o.key |
1148 | << " " << cpp_strerror(r) | |
1149 | << " " << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1150 | return r; |
1151 | } | |
f6b5b4d7 TL |
1152 | if (perfcounter) { |
1153 | perfcounter->inc(l_rgw_lc_expire_noncurrent, 1); | |
1154 | } | |
1155 | ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key | |
1156 | << " (non-current expiration) " | |
1157 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1158 | return 0; |
1159 | } | |
1160 | }; | |
1161 | ||
1162 | class LCOpAction_DMExpiration : public LCOpAction { | |
1163 | public: | |
f6b5b4d7 TL |
1164 | LCOpAction_DMExpiration(op_env& env) {} |
1165 | ||
11fdf7f2 TL |
1166 | bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { |
1167 | auto& o = oc.o; | |
1168 | if (!o.is_delete_marker()) { | |
f6b5b4d7 TL |
1169 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key |
1170 | << ": not a delete marker, skipping " | |
1171 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1172 | return false; |
1173 | } | |
f6b5b4d7 TL |
1174 | if (oc.next_has_same_name(o.key.name)) { |
1175 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key | |
1176 | << ": next is same object, skipping " | |
1177 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1178 | return false; |
1179 | } | |
1180 | ||
1181 | *exp_time = real_clock::now(); | |
1182 | ||
1183 | return true; | |
1184 | } | |
1185 | ||
1186 | int process(lc_op_ctx& oc) { | |
1187 | auto& o = oc.o; | |
1188 | int r = remove_expired_obj(oc, true); | |
1189 | if (r < 0) { | |
9f95a23c | 1190 | ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) " |
f6b5b4d7 TL |
1191 | << oc.bucket_info.bucket << ":" << o.key |
1192 | << " " << cpp_strerror(r) | |
1193 | << " " << oc.wq->thr_name() | |
1194 | << dendl; | |
11fdf7f2 TL |
1195 | return r; |
1196 | } | |
f6b5b4d7 TL |
1197 | if (perfcounter) { |
1198 | perfcounter->inc(l_rgw_lc_expire_dm, 1); | |
1199 | } | |
1200 | ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key | |
1201 | << " (delete marker expiration) " | |
1202 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1203 | return 0; |
1204 | } | |
1205 | }; | |
1206 | ||
1207 | class LCOpAction_Transition : public LCOpAction { | |
1208 | const transition_action& transition; | |
1209 | bool need_to_process{false}; | |
1210 | ||
1211 | protected: | |
1212 | virtual bool check_current_state(bool is_current) = 0; | |
1213 | virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0; | |
1214 | public: | |
e306af50 TL |
1215 | LCOpAction_Transition(const transition_action& _transition) |
1216 | : transition(_transition) {} | |
11fdf7f2 TL |
1217 | |
1218 | bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { | |
1219 | auto& o = oc.o; | |
1220 | ||
1221 | if (o.is_delete_marker()) { | |
1222 | return false; | |
1223 | } | |
1224 | ||
1225 | if (!check_current_state(o.is_current())) { | |
1226 | return false; | |
1227 | } | |
1228 | ||
1229 | auto mtime = get_effective_mtime(oc); | |
1230 | bool is_expired; | |
9f95a23c | 1231 | if (transition.days < 0) { |
11fdf7f2 | 1232 | if (transition.date == boost::none) { |
f6b5b4d7 TL |
1233 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key |
1234 | << ": no transition day/date set in rule, skipping " | |
1235 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1236 | return false; |
1237 | } | |
e306af50 TL |
1238 | is_expired = ceph_clock_now() >= |
1239 | ceph::real_clock::to_time_t(*transition.date); | |
11fdf7f2 TL |
1240 | *exp_time = *transition.date; |
1241 | } else { | |
1242 | is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time); | |
1243 | } | |
1244 | ||
f6b5b4d7 TL |
1245 | ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" |
1246 | << is_expired << " " | |
1247 | << oc.wq->thr_name() << dendl; | |
11fdf7f2 | 1248 | |
e306af50 TL |
1249 | need_to_process = |
1250 | (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != | |
1251 | transition.storage_class); | |
11fdf7f2 TL |
1252 | |
1253 | return is_expired; | |
1254 | } | |
1255 | ||
1256 | bool should_process() override { | |
1257 | return need_to_process; | |
1258 | } | |
1259 | ||
1260 | int process(lc_op_ctx& oc) { | |
1261 | auto& o = oc.o; | |
1262 | ||
1263 | rgw_placement_rule target_placement; | |
1264 | target_placement.inherit_from(oc.bucket_info.placement_rule); | |
1265 | target_placement.storage_class = transition.storage_class; | |
1266 | ||
e306af50 TL |
1267 | if (!oc.store->svc()->zone->get_zone_params(). |
1268 | valid_placement(target_placement)) { | |
f6b5b4d7 TL |
1269 | ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " |
1270 | << target_placement | |
9f95a23c | 1271 | << " bucket="<< oc.bucket_info.bucket |
f6b5b4d7 TL |
1272 | << " rule_id=" << oc.op.id |
1273 | << " " << oc.wq->thr_name() << dendl; | |
eafe8130 TL |
1274 | return -EINVAL; |
1275 | } | |
1276 | ||
e306af50 TL |
1277 | int r = oc.store->getRados()->transition_obj( |
1278 | oc.rctx, oc.bucket_info, oc.obj, target_placement, o.meta.mtime, | |
1279 | o.versioned_epoch, oc.dpp, null_yield); | |
11fdf7f2 | 1280 | if (r < 0) { |
9f95a23c | 1281 | ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " |
f6b5b4d7 TL |
1282 | << oc.bucket_info.bucket << ":" << o.key |
1283 | << " -> " << transition.storage_class | |
1284 | << " " << cpp_strerror(r) | |
1285 | << " " << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1286 | return r; |
1287 | } | |
f6b5b4d7 TL |
1288 | ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket |
1289 | << ":" << o.key << " -> " | |
1290 | << transition.storage_class | |
1291 | << " " << oc.wq->thr_name() << dendl; | |
11fdf7f2 TL |
1292 | return 0; |
1293 | } | |
1294 | }; | |
1295 | ||
1296 | class LCOpAction_CurrentTransition : public LCOpAction_Transition { | |
1297 | protected: | |
1298 | bool check_current_state(bool is_current) override { | |
1299 | return is_current; | |
1300 | } | |
1301 | ||
1302 | ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { | |
1303 | return oc.o.meta.mtime; | |
1304 | } | |
1305 | public: | |
e306af50 TL |
1306 | LCOpAction_CurrentTransition(const transition_action& _transition) |
1307 | : LCOpAction_Transition(_transition) {} | |
f6b5b4d7 TL |
1308 | int process(lc_op_ctx& oc) { |
1309 | int r = LCOpAction_Transition::process(oc); | |
1310 | if (r == 0) { | |
1311 | if (perfcounter) { | |
1312 | perfcounter->inc(l_rgw_lc_transition_current, 1); | |
1313 | } | |
1314 | } | |
1315 | return r; | |
1316 | } | |
11fdf7f2 TL |
1317 | }; |
1318 | ||
1319 | class LCOpAction_NonCurrentTransition : public LCOpAction_Transition { | |
1320 | protected: | |
1321 | bool check_current_state(bool is_current) override { | |
1322 | return !is_current; | |
1323 | } | |
1324 | ||
1325 | ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { | |
f6b5b4d7 | 1326 | return oc.effective_mtime; |
11fdf7f2 TL |
1327 | } |
1328 | public: | |
f6b5b4d7 TL |
1329 | LCOpAction_NonCurrentTransition(op_env& env, |
1330 | const transition_action& _transition) | |
1331 | : LCOpAction_Transition(_transition) | |
1332 | {} | |
1333 | int process(lc_op_ctx& oc) { | |
1334 | int r = LCOpAction_Transition::process(oc); | |
1335 | if (r == 0) { | |
1336 | if (perfcounter) { | |
1337 | perfcounter->inc(l_rgw_lc_transition_noncurrent, 1); | |
1338 | } | |
1339 | } | |
1340 | return r; | |
1341 | } | |
11fdf7f2 TL |
1342 | }; |
1343 | ||
1344 | void LCOpRule::build() | |
1345 | { | |
1346 | filters.emplace_back(new LCOpFilter_Tags); | |
1347 | ||
1348 | auto& op = env.op; | |
1349 | ||
1350 | if (op.expiration > 0 || | |
1351 | op.expiration_date != boost::none) { | |
f6b5b4d7 | 1352 | actions.emplace_back(new LCOpAction_CurrentExpiration(env)); |
11fdf7f2 TL |
1353 | } |
1354 | ||
1355 | if (op.dm_expiration) { | |
f6b5b4d7 | 1356 | actions.emplace_back(new LCOpAction_DMExpiration(env)); |
11fdf7f2 TL |
1357 | } |
1358 | ||
1359 | if (op.noncur_expiration > 0) { | |
f6b5b4d7 | 1360 | actions.emplace_back(new LCOpAction_NonCurrentExpiration(env)); |
11fdf7f2 TL |
1361 | } |
1362 | ||
1363 | for (auto& iter : op.transitions) { | |
1364 | actions.emplace_back(new LCOpAction_CurrentTransition(iter.second)); | |
1365 | } | |
1366 | ||
1367 | for (auto& iter : op.noncur_transitions) { | |
f6b5b4d7 | 1368 | actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second)); |
11fdf7f2 TL |
1369 | } |
1370 | } | |
1371 | ||
f6b5b4d7 | 1372 | void LCOpRule::update() |
11fdf7f2 | 1373 | { |
f6b5b4d7 TL |
1374 | next_key_name = env.ol.next_key_name(); |
1375 | effective_mtime = env.ol.get_prev_obj().meta.mtime; | |
1376 | } | |
11fdf7f2 | 1377 | |
f6b5b4d7 TL |
1378 | int LCOpRule::process(rgw_bucket_dir_entry& o, |
1379 | const DoutPrefixProvider *dpp, | |
1380 | WorkQ* wq) | |
1381 | { | |
1382 | lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq); | |
1383 | shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing | |
11fdf7f2 TL |
1384 | real_time exp; |
1385 | ||
1386 | for (auto& a : actions) { | |
1387 | real_time action_exp; | |
1388 | ||
1389 | if (a->check(ctx, &action_exp)) { | |
1390 | if (action_exp > exp) { | |
1391 | exp = action_exp; | |
1392 | selected = &a; | |
1393 | } | |
1394 | } | |
1395 | } | |
1396 | ||
1397 | if (selected && | |
1398 | (*selected)->should_process()) { | |
1399 | ||
1400 | /* | |
1401 | * Calling filter checks after action checks because | |
1402 | * all action checks (as they are implemented now) do | |
1403 | * not access the objects themselves, but return result | |
1404 | * from info from bucket index listing. The current tags filter | |
1405 | * check does access the objects, so we avoid unnecessary rados calls | |
1406 | * having filters check later in the process. | |
1407 | */ | |
1408 | ||
1409 | bool cont = false; | |
1410 | for (auto& f : filters) { | |
1411 | if (f->check(ctx)) { | |
1412 | cont = true; | |
1413 | break; | |
1414 | } | |
1415 | } | |
1416 | ||
1417 | if (!cont) { | |
f6b5b4d7 TL |
1418 | ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key |
1419 | << ": no rule match, skipping " | |
1420 | << " " << wq->thr_name() << dendl; | |
11fdf7f2 TL |
1421 | return 0; |
1422 | } | |
1423 | ||
1424 | int r = (*selected)->process(ctx); | |
1425 | if (r < 0) { | |
9f95a23c | 1426 | ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " |
f6b5b4d7 TL |
1427 | << env.bucket_info.bucket << ":" << o.key |
1428 | << " " << cpp_strerror(r) | |
1429 | << " " << wq->thr_name() << dendl; | |
11fdf7f2 TL |
1430 | return r; |
1431 | } | |
f6b5b4d7 TL |
1432 | ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" |
1433 | << o.key << " " << wq->thr_name() << dendl; | |
11fdf7f2 TL |
1434 | } |
1435 | ||
1436 | return 0; | |
1437 | ||
1438 | } | |
1439 | ||
f6b5b4d7 TL |
1440 | int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, |
1441 | time_t stop_at, bool once) | |
7c673cae FG |
1442 | { |
1443 | RGWLifecycleConfiguration config(cct); | |
1444 | RGWBucketInfo bucket_info; | |
1445 | map<string, bufferlist> bucket_attrs; | |
11fdf7f2 | 1446 | string no_ns, list_versions; |
7c673cae | 1447 | vector<rgw_bucket_dir_entry> objs; |
7c673cae FG |
1448 | vector<std::string> result; |
1449 | boost::split(result, shard_id, boost::is_any_of(":")); | |
1450 | string bucket_tenant = result[0]; | |
1451 | string bucket_name = result[1]; | |
11fdf7f2 | 1452 | string bucket_marker = result[2]; |
e306af50 TL |
1453 | int ret = store->getRados()->get_bucket_info( |
1454 | store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield, | |
1455 | &bucket_attrs); | |
7c673cae | 1456 | if (ret < 0) { |
f6b5b4d7 TL |
1457 | ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name |
1458 | << " failed" << dendl; | |
7c673cae FG |
1459 | return ret; |
1460 | } | |
1461 | ||
f6b5b4d7 TL |
1462 | auto stack_guard = make_scope_guard( |
1463 | [&worker, &bucket_info] | |
1464 | { | |
1465 | worker->workpool->drain(); | |
1466 | } | |
1467 | ); | |
1468 | ||
11fdf7f2 | 1469 | if (bucket_info.bucket.marker != bucket_marker) { |
f6b5b4d7 TL |
1470 | ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" |
1471 | << bucket_tenant << ":" << bucket_name | |
1472 | << " cur_marker=" << bucket_info.bucket.marker | |
11fdf7f2 | 1473 | << " orig_marker=" << bucket_marker << dendl; |
7c673cae FG |
1474 | return -ENOENT; |
1475 | } | |
1476 | ||
9f95a23c | 1477 | RGWRados::Bucket target(store->getRados(), bucket_info); |
7c673cae FG |
1478 | |
1479 | map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); | |
1480 | if (aiter == bucket_attrs.end()) | |
1481 | return 0; | |
1482 | ||
11fdf7f2 | 1483 | bufferlist::const_iterator iter{&aiter->second}; |
7c673cae FG |
1484 | try { |
1485 | config.decode(iter); | |
1486 | } catch (const buffer::error& e) { | |
f6b5b4d7 TL |
1487 | ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" |
1488 | << dendl; | |
7c673cae FG |
1489 | return -1; |
1490 | } | |
1491 | ||
f6b5b4d7 | 1492 | auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { |
e306af50 | 1493 | auto wt = |
f6b5b4d7 | 1494 | boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi); |
e306af50 | 1495 | auto& [op_rule, o] = wt; |
f6b5b4d7 | 1496 | |
e306af50 | 1497 | ldpp_dout(wk->get_lc(), 20) |
f6b5b4d7 TL |
1498 | << __func__ << "(): key=" << o.key << wq->thr_name() |
1499 | << dendl; | |
1500 | int ret = op_rule.process(o, wk->dpp, wq); | |
e306af50 TL |
1501 | if (ret < 0) { |
1502 | ldpp_dout(wk->get_lc(), 20) | |
1503 | << "ERROR: orule.process() returned ret=" << ret | |
f6b5b4d7 | 1504 | << wq->thr_name() |
e306af50 TL |
1505 | << dendl; |
1506 | } | |
1507 | }; | |
1508 | worker->workpool->setf(pf); | |
494da23a | 1509 | |
e306af50 | 1510 | multimap<string, lc_op>& prefix_map = config.get_prefix_map(); |
494da23a TL |
1511 | ldpp_dout(this, 10) << __func__ << "() prefix_map size=" |
1512 | << prefix_map.size() | |
1513 | << dendl; | |
1514 | ||
11fdf7f2 TL |
1515 | rgw_obj_key pre_marker; |
1516 | rgw_obj_key next_marker; | |
e306af50 TL |
1517 | for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); |
1518 | ++prefix_iter) { | |
f6b5b4d7 TL |
1519 | |
1520 | if (worker_should_stop(stop_at, once)) { | |
1521 | ldout(cct, 5) << __func__ << " interval budget EXPIRED worker " | |
1522 | << worker->ix | |
1523 | << dendl; | |
1524 | return 0; | |
1525 | } | |
1526 | ||
11fdf7f2 TL |
1527 | auto& op = prefix_iter->second; |
1528 | if (!is_valid_op(op)) { | |
1529 | continue; | |
1530 | } | |
f6b5b4d7 TL |
1531 | ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first |
1532 | << dendl; | |
11fdf7f2 | 1533 | if (prefix_iter != prefix_map.begin() && |
e306af50 TL |
1534 | (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), |
1535 | prev(prefix_iter)->first) == 0)) { | |
11fdf7f2 TL |
1536 | next_marker = pre_marker; |
1537 | } else { | |
1538 | pre_marker = next_marker; | |
1539 | } | |
494da23a TL |
1540 | |
1541 | LCObjsLister ol(store, bucket_info); | |
11fdf7f2 | 1542 | ol.set_prefix(prefix_iter->first); |
7c673cae | 1543 | |
11fdf7f2 | 1544 | ret = ol.init(); |
11fdf7f2 TL |
1545 | if (ret < 0) { |
1546 | if (ret == (-ENOENT)) | |
1547 | return 0; | |
1548 | ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl; | |
1549 | return ret; | |
7c673cae | 1550 | } |
7c673cae | 1551 | |
e306af50 | 1552 | op_env oenv(op, store, worker, bucket_info, ol); |
11fdf7f2 | 1553 | LCOpRule orule(oenv); |
e306af50 | 1554 | orule.build(); // why can't ctor do it? |
e306af50 TL |
1555 | rgw_bucket_dir_entry* o{nullptr}; |
1556 | for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) { | |
f6b5b4d7 TL |
1557 | orule.update(); |
1558 | std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o}; | |
e306af50 | 1559 | worker->workpool->enqueue(WorkItem{t1}); |
7c673cae | 1560 | } |
e306af50 | 1561 | worker->workpool->drain(); |
7c673cae FG |
1562 | } |
1563 | ||
f6b5b4d7 | 1564 | ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once); |
7c673cae FG |
1565 | return ret; |
1566 | } | |
1567 | ||
e306af50 | 1568 | int RGWLC::bucket_lc_post(int index, int max_lock_sec, |
f6b5b4d7 | 1569 | cls_rgw_lc_entry& entry, int& result, |
e306af50 | 1570 | LCWorker* worker) |
7c673cae FG |
1571 | { |
1572 | utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); | |
1573 | ||
1574 | rados::cls::lock::Lock l(lc_index_lock_name); | |
1575 | l.set_cookie(cookie); | |
1576 | l.set_duration(lock_duration); | |
1577 | ||
f6b5b4d7 TL |
1578 | dout(5) << "RGWLC::bucket_lc_post(): POST " << entry |
1579 | << " index: " << index << " worker ix: " << worker->ix | |
1580 | << dendl; | |
1581 | ||
7c673cae | 1582 | do { |
e306af50 TL |
1583 | int ret = l.lock_exclusive( |
1584 | &store->getRados()->lc_pool_ctx, obj_names[index]); | |
f6b5b4d7 TL |
1585 | if (ret == -EBUSY || ret == -EEXIST) { |
1586 | /* already locked by another lc processor */ | |
11fdf7f2 | 1587 | ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on " |
f6b5b4d7 | 1588 | << obj_names[index] << ", sleep 5, try again " << dendl; |
7c673cae FG |
1589 | sleep(5); |
1590 | continue; | |
1591 | } | |
1592 | if (ret < 0) | |
1593 | return 0; | |
f6b5b4d7 TL |
1594 | ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] |
1595 | << dendl; | |
7c673cae | 1596 | if (result == -ENOENT) { |
e306af50 TL |
1597 | ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx, |
1598 | obj_names[index], entry); | |
7c673cae | 1599 | if (ret < 0) { |
11fdf7f2 TL |
1600 | ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry " |
1601 | << obj_names[index] << dendl; | |
7c673cae | 1602 | } |
31f18b77 | 1603 | goto clean; |
7c673cae | 1604 | } else if (result < 0) { |
f6b5b4d7 | 1605 | entry.status = lc_failed; |
7c673cae | 1606 | } else { |
f6b5b4d7 | 1607 | entry.status = lc_complete; |
7c673cae FG |
1608 | } |
1609 | ||
e306af50 TL |
1610 | ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, |
1611 | obj_names[index], entry); | |
7c673cae | 1612 | if (ret < 0) { |
11fdf7f2 TL |
1613 | ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " |
1614 | << obj_names[index] << dendl; | |
7c673cae FG |
1615 | } |
1616 | clean: | |
9f95a23c | 1617 | l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); |
f6b5b4d7 TL |
1618 | ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " |
1619 | << obj_names[index] << dendl; | |
7c673cae FG |
1620 | return 0; |
1621 | } while (true); | |
1622 | } | |
1623 | ||
f6b5b4d7 TL |
1624 | int RGWLC::list_lc_progress(string& marker, uint32_t max_entries, |
1625 | vector<cls_rgw_lc_entry>& progress_map, | |
1626 | int& index) | |
7c673cae | 1627 | { |
f6b5b4d7 TL |
1628 | progress_map.clear(); |
1629 | for(; index < max_objs; index++, marker="") { | |
1630 | vector<cls_rgw_lc_entry> entries; | |
e306af50 TL |
1631 | int ret = |
1632 | cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, | |
1633 | max_entries, entries); | |
31f18b77 FG |
1634 | if (ret < 0) { |
1635 | if (ret == -ENOENT) { | |
11fdf7f2 | 1636 | ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object=" |
31f18b77 FG |
1637 | << obj_names[index] << dendl; |
1638 | continue; | |
1639 | } else { | |
1640 | return ret; | |
1641 | } | |
1642 | } | |
f6b5b4d7 TL |
1643 | progress_map.reserve(progress_map.size() + entries.size()); |
1644 | progress_map.insert(progress_map.end(), entries.begin(), entries.end()); | |
1645 | ||
1646 | /* update index, marker tuple */ | |
1647 | if (progress_map.size() > 0) | |
1648 | marker = progress_map.back().bucket; | |
1649 | ||
1650 | if (progress_map.size() >= max_entries) | |
1651 | break; | |
7c673cae FG |
1652 | } |
1653 | return 0; | |
1654 | } | |
1655 | ||
e306af50 | 1656 | static inline vector<int> random_sequence(uint32_t n) |
7c673cae | 1657 | { |
e306af50 TL |
1658 | vector<int> v(n-1, 0); |
1659 | std::generate(v.begin(), v.end(), | |
1660 | [ix = 0]() mutable { | |
1661 | return ix++; | |
1662 | }); | |
1663 | std::random_shuffle(v.begin(), v.end()); | |
1664 | return v; | |
1665 | } | |
7c673cae | 1666 | |
f6b5b4d7 | 1667 | int RGWLC::process(LCWorker* worker, bool once = false) |
e306af50 TL |
1668 | { |
1669 | int max_secs = cct->_conf->rgw_lc_lock_max_time; | |
7c673cae | 1670 | |
e306af50 TL |
1671 | /* generate an index-shard sequence unrelated to any other |
1672 | * that might be running in parallel */ | |
1673 | vector<int> shard_seq = random_sequence(max_objs); | |
1674 | for (auto index : shard_seq) { | |
f6b5b4d7 | 1675 | int ret = process(index, max_secs, worker, once); |
7c673cae FG |
1676 | if (ret < 0) |
1677 | return ret; | |
1678 | } | |
1679 | ||
1680 | return 0; | |
1681 | } | |
1682 | ||
f6b5b4d7 | 1683 | bool RGWLC::expired_session(time_t started) |
7c673cae | 1684 | { |
f6b5b4d7 TL |
1685 | time_t interval = (cct->_conf->rgw_lc_debug_interval > 0) |
1686 | ? cct->_conf->rgw_lc_debug_interval | |
1687 | : 24*60*60; | |
1688 | ||
1689 | auto now = time(nullptr); | |
1690 | ||
1691 | dout(16) << "RGWLC::expired_session" | |
1692 | << " started: " << started | |
1693 | << " interval: " << interval << "(*2==" << 2*interval << ")" | |
1694 | << " now: " << now | |
1695 | << dendl; | |
1696 | ||
1697 | return (started + 2*interval < now); | |
1698 | } | |
1699 | ||
1700 | time_t RGWLC::thread_stop_at() | |
1701 | { | |
1702 | uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0) | |
1703 | ? cct->_conf->rgw_lc_debug_interval | |
1704 | : 24*60*60; | |
1705 | ||
1706 | return time(nullptr) + interval; | |
1707 | } | |
1708 | ||
1709 | int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, | |
1710 | bool once = false) | |
1711 | { | |
1712 | dout(5) << "RGWLC::process(): ENTER: " | |
1713 | << "index: " << index << " worker ix: " << worker->ix | |
1714 | << dendl; | |
1715 | ||
7c673cae FG |
1716 | rados::cls::lock::Lock l(lc_index_lock_name); |
1717 | do { | |
1718 | utime_t now = ceph_clock_now(); | |
f6b5b4d7 TL |
1719 | //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS |
1720 | cls_rgw_lc_entry entry; | |
7c673cae FG |
1721 | if (max_lock_secs <= 0) |
1722 | return -EAGAIN; | |
1723 | ||
1724 | utime_t time(max_lock_secs, 0); | |
1725 | l.set_duration(time); | |
1726 | ||
e306af50 TL |
1727 | int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, |
1728 | obj_names[index]); | |
f6b5b4d7 TL |
1729 | if (ret == -EBUSY || ret == -EEXIST) { |
1730 | /* already locked by another lc processor */ | |
11fdf7f2 TL |
1731 | ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " |
1732 | << obj_names[index] << ", sleep 5, try again" << dendl; | |
7c673cae FG |
1733 | sleep(5); |
1734 | continue; | |
1735 | } | |
1736 | if (ret < 0) | |
1737 | return 0; | |
1738 | ||
7c673cae | 1739 | cls_rgw_lc_obj_head head; |
e306af50 TL |
1740 | ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index], |
1741 | head); | |
7c673cae | 1742 | if (ret < 0) { |
11fdf7f2 TL |
1743 | ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head " |
1744 | << obj_names[index] << ", ret=" << ret << dendl; | |
7c673cae FG |
1745 | goto exit; |
1746 | } | |
1747 | ||
f6b5b4d7 TL |
1748 | if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) { |
1749 | ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx, | |
1750 | obj_names[index], head.marker, entry); | |
1751 | if (ret >= 0) { | |
1752 | if (entry.status == lc_processing) { | |
1753 | if (expired_session(entry.start_time)) { | |
1754 | dout(5) << "RGWLC::process(): STALE lc session found for: " << entry | |
1755 | << " index: " << index << " worker ix: " << worker->ix | |
1756 | << " (clearing)" | |
1757 | << dendl; | |
1758 | } else { | |
1759 | dout(5) << "RGWLC::process(): ACTIVE entry: " << entry | |
1760 | << " index: " << index << " worker ix: " << worker->ix | |
1761 | << dendl; | |
1762 | goto exit; | |
1763 | } | |
1764 | } | |
1765 | } | |
1766 | } | |
1767 | ||
1768 | if(!if_already_run_today(head.start_date) || | |
1769 | once) { | |
7c673cae FG |
1770 | head.start_date = now; |
1771 | head.marker.clear(); | |
e306af50 | 1772 | ret = bucket_lc_prepare(index, worker); |
7c673cae | 1773 | if (ret < 0) { |
11fdf7f2 | 1774 | ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object " |
e306af50 TL |
1775 | << obj_names[index] |
1776 | << ", ret=" << ret | |
1777 | << dendl; | |
7c673cae FG |
1778 | goto exit; |
1779 | } | |
1780 | } | |
1781 | ||
e306af50 TL |
1782 | ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx, |
1783 | obj_names[index], head.marker, entry); | |
7c673cae | 1784 | if (ret < 0) { |
11fdf7f2 TL |
1785 | ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " |
1786 | << obj_names[index] << dendl; | |
7c673cae FG |
1787 | goto exit; |
1788 | } | |
1789 | ||
e306af50 | 1790 | /* termination condition (eof) */ |
f6b5b4d7 | 1791 | if (entry.bucket.empty()) |
7c673cae FG |
1792 | goto exit; |
1793 | ||
f6b5b4d7 TL |
1794 | ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry |
1795 | << " index: " << index << " worker ix: " << worker->ix | |
1796 | << dendl; | |
1797 | ||
1798 | entry.status = lc_processing; | |
e306af50 | 1799 | ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, |
f6b5b4d7 | 1800 | obj_names[index], entry); |
7c673cae | 1801 | if (ret < 0) { |
e306af50 | 1802 | ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " |
f6b5b4d7 | 1803 | << obj_names[index] << entry.bucket << entry.status << dendl; |
7c673cae FG |
1804 | goto exit; |
1805 | } | |
1806 | ||
f6b5b4d7 TL |
1807 | head.marker = entry.bucket; |
1808 | ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, | |
1809 | obj_names[index], head); | |
7c673cae | 1810 | if (ret < 0) { |
e306af50 TL |
1811 | ldpp_dout(this, 0) << "RGWLC::process() failed to put head " |
1812 | << obj_names[index] | |
f6b5b4d7 | 1813 | << dendl; |
7c673cae FG |
1814 | goto exit; |
1815 | } | |
f6b5b4d7 TL |
1816 | |
1817 | ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry | |
1818 | << " index: " << index << " worker ix: " << worker->ix | |
1819 | << dendl; | |
1820 | ||
9f95a23c | 1821 | l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); |
f6b5b4d7 | 1822 | ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once); |
e306af50 | 1823 | bucket_lc_post(index, max_lock_secs, entry, ret, worker); |
f6b5b4d7 TL |
1824 | } while(1 && !once); |
1825 | ||
1826 | return 0; | |
3efd9988 | 1827 | |
7c673cae | 1828 | exit: |
f6b5b4d7 TL |
1829 | l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); |
1830 | return 0; | |
7c673cae FG |
1831 | } |
1832 | ||
1833 | void RGWLC::start_processor() | |
1834 | { | |
e306af50 TL |
1835 | auto maxw = cct->_conf->rgw_lc_max_worker; |
1836 | workers.reserve(maxw); | |
1837 | for (int ix = 0; ix < maxw; ++ix) { | |
1838 | auto worker = | |
f6b5b4d7 | 1839 | std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix); |
e306af50 TL |
1840 | worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str()); |
1841 | workers.emplace_back(std::move(worker)); | |
1842 | } | |
7c673cae FG |
1843 | } |
1844 | ||
1845 | void RGWLC::stop_processor() | |
1846 | { | |
1847 | down_flag = true; | |
e306af50 | 1848 | for (auto& worker : workers) { |
7c673cae FG |
1849 | worker->stop(); |
1850 | worker->join(); | |
1851 | } | |
e306af50 | 1852 | workers.clear(); |
7c673cae FG |
1853 | } |
1854 | ||
11fdf7f2 TL |
1855 | unsigned RGWLC::get_subsys() const |
1856 | { | |
1857 | return dout_subsys; | |
1858 | } | |
1859 | ||
1860 | std::ostream& RGWLC::gen_prefix(std::ostream& out) const | |
1861 | { | |
1862 | return out << "lifecycle: "; | |
1863 | } | |
1864 | ||
7c673cae FG |
1865 | void RGWLC::LCWorker::stop() |
1866 | { | |
9f95a23c TL |
1867 | std::lock_guard l{lock}; |
1868 | cond.notify_all(); | |
7c673cae FG |
1869 | } |
1870 | ||
1871 | bool RGWLC::going_down() | |
1872 | { | |
1873 | return down_flag; | |
1874 | } | |
1875 | ||
1876 | bool RGWLC::LCWorker::should_work(utime_t& now) | |
1877 | { | |
1878 | int start_hour; | |
1879 | int start_minute; | |
1880 | int end_hour; | |
1881 | int end_minute; | |
1882 | string worktime = cct->_conf->rgw_lifecycle_work_time; | |
e306af50 TL |
1883 | sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, |
1884 | &end_hour, &end_minute); | |
7c673cae FG |
1885 | struct tm bdt; |
1886 | time_t tt = now.sec(); | |
1887 | localtime_r(&tt, &bdt); | |
1888 | ||
1889 | if (cct->_conf->rgw_lc_debug_interval > 0) { | |
1890 | /* We're debugging, so say we can run */ | |
1891 | return true; | |
1892 | } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) && | |
1893 | (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) { | |
1894 | return true; | |
1895 | } else { | |
1896 | return false; | |
1897 | } | |
1898 | ||
1899 | } | |
1900 | ||
1901 | int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now) | |
1902 | { | |
11fdf7f2 TL |
1903 | int secs; |
1904 | ||
7c673cae | 1905 | if (cct->_conf->rgw_lc_debug_interval > 0) { |
11fdf7f2 | 1906 | secs = start + cct->_conf->rgw_lc_debug_interval - now; |
7c673cae FG |
1907 | if (secs < 0) |
1908 | secs = 0; | |
1909 | return (secs); | |
1910 | } | |
1911 | ||
1912 | int start_hour; | |
1913 | int start_minute; | |
1914 | int end_hour; | |
1915 | int end_minute; | |
1916 | string worktime = cct->_conf->rgw_lifecycle_work_time; | |
e306af50 TL |
1917 | sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, |
1918 | &end_minute); | |
7c673cae FG |
1919 | struct tm bdt; |
1920 | time_t tt = now.sec(); | |
1921 | time_t nt; | |
1922 | localtime_r(&tt, &bdt); | |
1923 | bdt.tm_hour = start_hour; | |
1924 | bdt.tm_min = start_minute; | |
1925 | bdt.tm_sec = 0; | |
1926 | nt = mktime(&bdt); | |
11fdf7f2 TL |
1927 | secs = nt - tt; |
1928 | ||
1929 | return secs>0 ? secs : secs+24*60*60; | |
1930 | } | |
1931 | ||
e306af50 TL |
1932 | RGWLC::LCWorker::~LCWorker() |
1933 | { | |
e306af50 TL |
1934 | delete workpool; |
1935 | } /* ~LCWorker */ | |
1936 | ||
1937 | void RGWLifecycleConfiguration::generate_test_instances( | |
1938 | list<RGWLifecycleConfiguration*>& o) | |
11fdf7f2 TL |
1939 | { |
1940 | o.push_back(new RGWLifecycleConfiguration); | |
1941 | } | |
1942 | ||
f6b5b4d7 TL |
1943 | static inline void get_lc_oid(CephContext *cct, |
1944 | const string& shard_id, string *oid) | |
11fdf7f2 | 1945 | { |
e306af50 TL |
1946 | int max_objs = |
1947 | (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : | |
1948 | cct->_conf->rgw_lc_max_objs); | |
f6b5b4d7 | 1949 | /* n.b. review hash algo */ |
e306af50 TL |
1950 | int index = ceph_str_hash_linux(shard_id.c_str(), |
1951 | shard_id.size()) % HASH_PRIME % max_objs; | |
11fdf7f2 TL |
1952 | *oid = lc_oid_prefix; |
1953 | char buf[32]; | |
1954 | snprintf(buf, 32, ".%d", index); | |
1955 | oid->append(buf); | |
1956 | return; | |
1957 | } | |
1958 | ||
11fdf7f2 TL |
1959 | static std::string get_lc_shard_name(const rgw_bucket& bucket){ |
1960 | return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker); | |
1961 | } | |
1962 | ||
1963 | template<typename F> | |
f6b5b4d7 TL |
1964 | static int guard_lc_modify(rgw::sal::RGWRadosStore* store, |
1965 | const rgw_bucket& bucket, const string& cookie, | |
1966 | const F& f) { | |
11fdf7f2 TL |
1967 | CephContext *cct = store->ctx(); |
1968 | ||
1969 | string shard_id = get_lc_shard_name(bucket); | |
1970 | ||
1971 | string oid; | |
1972 | get_lc_oid(cct, shard_id, &oid); | |
1973 | ||
f6b5b4d7 TL |
1974 | /* XXX it makes sense to take shard_id for a bucket_id? */ |
1975 | cls_rgw_lc_entry entry; | |
1976 | entry.bucket = shard_id; | |
1977 | entry.status = lc_uninitial; | |
11fdf7f2 | 1978 | int max_lock_secs = cct->_conf->rgw_lc_lock_max_time; |
7c673cae | 1979 | |
11fdf7f2 TL |
1980 | rados::cls::lock::Lock l(lc_index_lock_name); |
1981 | utime_t time(max_lock_secs, 0); | |
1982 | l.set_duration(time); | |
1983 | l.set_cookie(cookie); | |
1984 | ||
9f95a23c | 1985 | librados::IoCtx *ctx = store->getRados()->get_lc_pool_ctx(); |
11fdf7f2 TL |
1986 | int ret; |
1987 | ||
1988 | do { | |
1989 | ret = l.lock_exclusive(ctx, oid); | |
1990 | if (ret == -EBUSY || ret == -EEXIST) { | |
1991 | ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " | |
1992 | << oid << ", sleep 5, try again" << dendl; | |
1993 | sleep(5); // XXX: return retryable error | |
1994 | continue; | |
1995 | } | |
1996 | if (ret < 0) { | |
1997 | ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " | |
1998 | << oid << ", ret=" << ret << dendl; | |
1999 | break; | |
2000 | } | |
2001 | ret = f(ctx, oid, entry); | |
2002 | if (ret < 0) { | |
2003 | ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on " | |
2004 | << oid << ", ret=" << ret << dendl; | |
2005 | } | |
2006 | break; | |
2007 | } while(true); | |
2008 | l.unlock(ctx, oid); | |
2009 | return ret; | |
2010 | } | |
2011 | ||
2012 | int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info, | |
2013 | const map<string, bufferlist>& bucket_attrs, | |
2014 | RGWLifecycleConfiguration *config) | |
2015 | { | |
2016 | map<string, bufferlist> attrs = bucket_attrs; | |
494da23a TL |
2017 | bufferlist lc_bl; |
2018 | config->encode(lc_bl); | |
2019 | ||
2020 | attrs[RGW_ATTR_LC] = std::move(lc_bl); | |
2021 | ||
e306af50 TL |
2022 | int ret = |
2023 | store->ctl()->bucket->set_bucket_instance_attrs( | |
2024 | bucket_info, attrs, &bucket_info.objv_tracker, null_yield); | |
11fdf7f2 TL |
2025 | if (ret < 0) |
2026 | return ret; | |
2027 | ||
2028 | rgw_bucket& bucket = bucket_info.bucket; | |
2029 | ||
f6b5b4d7 | 2030 | |
e306af50 TL |
2031 | ret = guard_lc_modify(store, bucket, cookie, |
2032 | [&](librados::IoCtx *ctx, const string& oid, | |
f6b5b4d7 | 2033 | const cls_rgw_lc_entry& entry) { |
11fdf7f2 TL |
2034 | return cls_rgw_lc_set_entry(*ctx, oid, entry); |
2035 | }); | |
2036 | ||
2037 | return ret; | |
7c673cae FG |
2038 | } |
2039 | ||
11fdf7f2 TL |
2040 | int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info, |
2041 | const map<string, bufferlist>& bucket_attrs) | |
2042 | { | |
2043 | map<string, bufferlist> attrs = bucket_attrs; | |
2044 | attrs.erase(RGW_ATTR_LC); | |
e306af50 TL |
2045 | int ret = |
2046 | store->ctl()->bucket->set_bucket_instance_attrs( | |
2047 | bucket_info, attrs, &bucket_info.objv_tracker, null_yield); | |
11fdf7f2 TL |
2048 | |
2049 | rgw_bucket& bucket = bucket_info.bucket; | |
2050 | ||
2051 | if (ret < 0) { | |
2052 | ldout(cct, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket=" | |
2053 | << bucket.name << " returned err=" << ret << dendl; | |
2054 | return ret; | |
2055 | } | |
2056 | ||
2057 | ||
e306af50 TL |
2058 | ret = guard_lc_modify(store, bucket, cookie, |
2059 | [&](librados::IoCtx *ctx, const string& oid, | |
f6b5b4d7 | 2060 | const cls_rgw_lc_entry& entry) { |
11fdf7f2 TL |
2061 | return cls_rgw_lc_rm_entry(*ctx, oid, entry); |
2062 | }); | |
2063 | ||
2064 | return ret; | |
e306af50 TL |
2065 | } /* RGWLC::remove_bucket_config */ |
2066 | ||
2067 | RGWLC::~RGWLC() | |
2068 | { | |
2069 | stop_processor(); | |
2070 | finalize(); | |
2071 | } /* ~RGWLC() */ | |
11fdf7f2 TL |
2072 | |
2073 | namespace rgw::lc { | |
2074 | ||
e306af50 TL |
2075 | int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, |
2076 | const RGWBucketInfo& bucket_info, | |
11fdf7f2 TL |
2077 | const map<std::string,bufferlist>& battrs) |
2078 | { | |
2079 | if (auto aiter = battrs.find(RGW_ATTR_LC); | |
2080 | aiter == battrs.end()) { | |
2081 | return 0; // No entry, nothing to fix | |
2082 | } | |
2083 | ||
2084 | auto shard_name = get_lc_shard_name(bucket_info.bucket); | |
2085 | std::string lc_oid; | |
2086 | get_lc_oid(store->ctx(), shard_name, &lc_oid); | |
2087 | ||
f6b5b4d7 | 2088 | cls_rgw_lc_entry entry; |
11fdf7f2 TL |
2089 | // There are multiple cases we need to encounter here |
2090 | // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets | |
2091 | // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update | |
2092 | // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker) | |
2093 | // We are not dropping the old marker here as that would be caught by the next LC process update | |
9f95a23c | 2094 | auto lc_pool_ctx = store->getRados()->get_lc_pool_ctx(); |
11fdf7f2 TL |
2095 | int ret = cls_rgw_lc_get_entry(*lc_pool_ctx, |
2096 | lc_oid, shard_name, entry); | |
2097 | if (ret == 0) { | |
2098 | ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl; | |
2099 | return ret; // entry is already existing correctly set to marker | |
2100 | } | |
2101 | ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl; | |
2102 | if (ret == -ENOENT) { | |
2103 | ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name | |
2104 | << " creating " << dendl; | |
2105 | // TODO: we have too many ppl making cookies like this! | |
2106 | char cookie_buf[COOKIE_LEN + 1]; | |
2107 | gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); | |
2108 | std::string cookie = cookie_buf; | |
2109 | ||
e306af50 TL |
2110 | ret = guard_lc_modify( |
2111 | store, bucket_info.bucket, cookie, | |
f6b5b4d7 TL |
2112 | [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx, |
2113 | const string& oid, | |
2114 | const cls_rgw_lc_entry& entry) { | |
e306af50 TL |
2115 | return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry); |
2116 | }); | |
11fdf7f2 TL |
2117 | |
2118 | } | |
2119 | ||
2120 | return ret; | |
2121 | } | |
2122 | ||
9f95a23c TL |
2123 | std::string s3_expiration_header( |
2124 | DoutPrefixProvider* dpp, | |
2125 | const rgw_obj_key& obj_key, | |
2126 | const RGWObjTags& obj_tagset, | |
2127 | const ceph::real_time& mtime, | |
2128 | const std::map<std::string, buffer::list>& bucket_attrs) | |
2129 | { | |
2130 | CephContext* cct = dpp->get_cct(); | |
2131 | RGWLifecycleConfiguration config(cct); | |
2132 | std::string hdr{""}; | |
2133 | ||
2134 | const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); | |
2135 | if (aiter == bucket_attrs.end()) | |
2136 | return hdr; | |
2137 | ||
2138 | bufferlist::const_iterator iter{&aiter->second}; | |
2139 | try { | |
2140 | config.decode(iter); | |
2141 | } catch (const buffer::error& e) { | |
2142 | ldpp_dout(dpp, 0) << __func__ | |
2143 | << "() decode life cycle config failed" | |
2144 | << dendl; | |
2145 | return hdr; | |
2146 | } /* catch */ | |
2147 | ||
2148 | /* dump tags at debug level 16 */ | |
2149 | RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags(); | |
2150 | if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) { | |
2151 | for (const auto& elt : obj_tag_map) { | |
2152 | ldout(cct, 16) << __func__ | |
2153 | << "() key=" << elt.first << " val=" << elt.second | |
2154 | << dendl; | |
2155 | } | |
2156 | } | |
2157 | ||
2158 | boost::optional<ceph::real_time> expiration_date; | |
2159 | boost::optional<std::string> rule_id; | |
2160 | ||
2161 | const auto& rule_map = config.get_rule_map(); | |
2162 | for (const auto& ri : rule_map) { | |
2163 | const auto& rule = ri.second; | |
2164 | auto& id = rule.get_id(); | |
2165 | auto& prefix = rule.get_prefix(); | |
2166 | auto& filter = rule.get_filter(); | |
2167 | auto& expiration = rule.get_expiration(); | |
2168 | auto& noncur_expiration = rule.get_noncur_expiration(); | |
2169 | ||
2170 | ldpp_dout(dpp, 10) << "rule: " << ri.first | |
2171 | << " prefix: " << prefix | |
2172 | << " expiration: " | |
2173 | << " date: " << expiration.get_date() | |
2174 | << " days: " << expiration.get_days() | |
2175 | << " noncur_expiration: " | |
2176 | << " date: " << noncur_expiration.get_date() | |
2177 | << " days: " << noncur_expiration.get_days() | |
2178 | << dendl; | |
2179 | ||
2180 | /* skip if rule !enabled | |
2181 | * if rule has prefix, skip iff object !match prefix | |
2182 | * if rule has tags, skip iff object !match tags | |
2183 | * note if object is current or non-current, compare accordingly | |
2184 | * if rule has days, construct date expression and save iff older | |
2185 | * than last saved | |
2186 | * if rule has date, convert date expression and save iff older | |
2187 | * than last saved | |
2188 | * if the date accum has a value, format it into hdr | |
2189 | */ | |
2190 | ||
2191 | if (! rule.is_enabled()) | |
2192 | continue; | |
2193 | ||
2194 | if(! prefix.empty()) { | |
2195 | if (! boost::starts_with(obj_key.name, prefix)) | |
2196 | continue; | |
2197 | } | |
2198 | ||
2199 | if (filter.has_tags()) { | |
2200 | bool tag_match = false; | |
2201 | const RGWObjTags& rule_tagset = filter.get_tags(); | |
2202 | for (auto& tag : rule_tagset.get_tags()) { | |
2203 | /* remember, S3 tags are {key,value} tuples */ | |
2204 | auto ma1 = obj_tag_map.find(tag.first); | |
2205 | if ( ma1 != obj_tag_map.end()) { | |
2206 | if (tag.second == ma1->second) { | |
2207 | ldpp_dout(dpp, 10) << "tag match obj_key=" << obj_key | |
2208 | << " rule_id=" << id | |
2209 | << " tag=" << tag | |
2210 | << " (ma=" << *ma1 << ")" | |
2211 | << dendl; | |
2212 | tag_match = true; | |
2213 | break; | |
2214 | } | |
2215 | } | |
2216 | } | |
2217 | if (! tag_match) | |
2218 | continue; | |
2219 | } | |
2220 | ||
2221 | // compute a uniform expiration date | |
2222 | boost::optional<ceph::real_time> rule_expiration_date; | |
2223 | const LCExpiration& rule_expiration = | |
2224 | (obj_key.instance.empty()) ? expiration : noncur_expiration; | |
2225 | ||
2226 | if (rule_expiration.has_date()) { | |
2227 | rule_expiration_date = | |
2228 | boost::optional<ceph::real_time>( | |
2229 | ceph::from_iso_8601(rule.get_expiration().get_date())); | |
2230 | } else { | |
2231 | if (rule_expiration.has_days()) { | |
2232 | rule_expiration_date = | |
2233 | boost::optional<ceph::real_time>( | |
2234 | mtime + make_timespan(rule_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60)); | |
2235 | } | |
2236 | } | |
2237 | ||
2238 | // update earliest expiration | |
2239 | if (rule_expiration_date) { | |
2240 | if ((! expiration_date) || | |
2241 | (*expiration_date > *rule_expiration_date)) { | |
2242 | expiration_date = | |
2243 | boost::optional<ceph::real_time>(rule_expiration_date); | |
2244 | rule_id = boost::optional<std::string>(id); | |
2245 | } | |
2246 | } | |
2247 | } | |
2248 | ||
2249 | // cond format header | |
2250 | if (expiration_date && rule_id) { | |
2251 | // Fri, 23 Dec 2012 00:00:00 GMT | |
2252 | char exp_buf[100]; | |
2253 | time_t exp = ceph::real_clock::to_time_t(*expiration_date); | |
2254 | if (std::strftime(exp_buf, sizeof(exp_buf), | |
2255 | "%a, %d %b %Y %T %Z", std::gmtime(&exp))) { | |
2256 | hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf, | |
2257 | *rule_id); | |
2258 | } else { | |
2259 | ldpp_dout(dpp, 0) << __func__ << | |
2260 | "() strftime of life cycle expiration header failed" | |
2261 | << dendl; | |
2262 | } | |
2263 | } | |
2264 | ||
2265 | return hdr; | |
2266 | ||
2267 | } /* rgwlc_s3_expiration_header */ | |
2268 | ||
f6b5b4d7 TL |
2269 | bool s3_multipart_abort_header( |
2270 | DoutPrefixProvider* dpp, | |
2271 | const rgw_obj_key& obj_key, | |
2272 | const ceph::real_time& mtime, | |
2273 | const std::map<std::string, buffer::list>& bucket_attrs, | |
2274 | ceph::real_time& abort_date, | |
2275 | std::string& rule_id) | |
2276 | { | |
2277 | CephContext* cct = dpp->get_cct(); | |
2278 | RGWLifecycleConfiguration config(cct); | |
2279 | ||
2280 | const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); | |
2281 | if (aiter == bucket_attrs.end()) | |
2282 | return false; | |
2283 | ||
2284 | bufferlist::const_iterator iter{&aiter->second}; | |
2285 | try { | |
2286 | config.decode(iter); | |
2287 | } catch (const buffer::error& e) { | |
2288 | ldpp_dout(dpp, 0) << __func__ | |
2289 | << "() decode life cycle config failed" | |
2290 | << dendl; | |
2291 | return false; | |
2292 | } /* catch */ | |
2293 | ||
2294 | std::optional<ceph::real_time> abort_date_tmp; | |
2295 | std::optional<std::string_view> rule_id_tmp; | |
2296 | const auto& rule_map = config.get_rule_map(); | |
2297 | for (const auto& ri : rule_map) { | |
2298 | const auto& rule = ri.second; | |
2299 | const auto& id = rule.get_id(); | |
2300 | const auto& filter = rule.get_filter(); | |
2301 | const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix(); | |
2302 | const auto& mp_expiration = rule.get_mp_expiration(); | |
2303 | if (!rule.is_enabled()) { | |
2304 | continue; | |
2305 | } | |
2306 | if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) { | |
2307 | continue; | |
2308 | } | |
2309 | ||
2310 | std::optional<ceph::real_time> rule_abort_date; | |
2311 | if (mp_expiration.has_days()) { | |
2312 | rule_abort_date = std::optional<ceph::real_time>( | |
2313 | mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60)); | |
2314 | } | |
2315 | ||
2316 | // update earliest abort date | |
2317 | if (rule_abort_date) { | |
2318 | if ((! abort_date_tmp) || | |
2319 | (*abort_date_tmp > *rule_abort_date)) { | |
2320 | abort_date_tmp = | |
2321 | std::optional<ceph::real_time>(rule_abort_date); | |
2322 | rule_id_tmp = std::optional<std::string_view>(id); | |
2323 | } | |
2324 | } | |
2325 | } | |
2326 | if (abort_date_tmp && rule_id_tmp) { | |
2327 | abort_date = *abort_date_tmp; | |
2328 | rule_id = *rule_id_tmp; | |
2329 | return true; | |
2330 | } else { | |
2331 | return false; | |
2332 | } | |
2333 | } | |
2334 | ||
9f95a23c | 2335 | } /* namespace rgw::lc */ |