]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_lc.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_lc.cc
index 9a3def0a10822db2f13f44cf231b524dd4aba9cb..f176e8e8141f3433bdec8f44372d73e6188e3f25 100644 (file)
@@ -13,6 +13,7 @@
 #include <boost/algorithm/string/predicate.hpp>
 #include <boost/variant.hpp>
 
+#include "include/scope_guard.h"
 #include "common/Formatter.h"
 #include "common/containers.h"
 #include <common/errno.h>
@@ -26,6 +27,7 @@
 #include "rgw_zone.h"
 #include "rgw_string.h"
 #include "rgw_multi.h"
+#include "rgw_sal.h"
 
 // this seems safe to use, at least for now--arguably, we should
 // prefer header-only fmt, in general
@@ -212,9 +214,10 @@ void *RGWLC::LCWorker::entry() {
     utime_t start = ceph_clock_now();
     if (should_work(start)) {
       ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
-      int r = lc->process(this);
+      int r = lc->process(this, false /* once */);
       if (r < 0) {
-        ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
+        ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
+                         << r << dendl;
       }
       ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
     }
@@ -226,7 +229,8 @@ void *RGWLC::LCWorker::entry() {
     utime_t next;
     next.set_from_double(end + secs);
 
-    ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
+    ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
+                     << rgw_to_asctime(next) << dendl;
 
     std::unique_lock l{lock};
     cond.wait_for(l, std::chrono::seconds(secs));
@@ -262,7 +266,7 @@ void RGWLC::finalize()
   delete[] obj_names;
 }
 
-bool RGWLC::if_already_run_today(time_t& start_date)
+bool RGWLC::if_already_run_today(time_t start_date)
 {
   struct tm bdt;
   time_t begin_of_day;
@@ -286,32 +290,48 @@ bool RGWLC::if_already_run_today(time_t& start_date)
     return false;
 }
 
+static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
+  os << "<ent: bucket=";
+  os << ent.bucket;
+  os << "; start_time=";
+  os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
+  os << "; status=";
+    os << ent.status;
+    os << ">";
+    return os;
+}
+
 int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
 {
-  map<string, int > entries;
-
+  vector<cls_rgw_lc_entry> entries;
   string marker;
 
+  dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
+         << "index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
 #define MAX_LC_LIST_ENTRIES 100
   do {
     int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
                              marker, MAX_LC_LIST_ENTRIES, entries);
     if (ret < 0)
       return ret;
-    map<string, int>::iterator iter;
-    for (iter = entries.begin(); iter != entries.end(); ++iter) {
-      pair<string, int > entry(iter->first, lc_uninitial);
+
+    for (auto& entry : entries) {
+      entry.start_time = ceph_clock_now();
+      entry.status = lc_uninitial; // lc_uninitial? really?
       ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
-                                obj_names[index],  entry);
+                                obj_names[index], entry);
       if (ret < 0) {
-        ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
-            << obj_names[index] << dendl;
+        ldpp_dout(this, 0)
+         << "RGWLC::bucket_lc_prepare() failed to set entry on "
+         << obj_names[index] << dendl;
         return ret;
       }
     }
 
-    if (!entries.empty()) {
-      marker = std::move(entries.rbegin()->first);
+    if (! entries.empty()) {
+      marker = std::move(entries.back().bucket);
     }
   } while (!entries.empty());
 
@@ -332,12 +352,19 @@ static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days,
     cmp = days*cct->_conf->rgw_lc_debug_interval;
     base_time = ceph_clock_now();
   }
-  timediff = base_time - ceph::real_clock::to_time_t(mtime);
+  auto tt_mtime = ceph::real_clock::to_time_t(mtime);
+  timediff = base_time - tt_mtime;
 
   if (expire_time) {
     *expire_time = mtime + make_timespan(cmp);
   }
-  ldout(cct, 20) << __func__ << "(): mtime=" << mtime << " days=" << days << " base_time=" << base_time << " timediff=" << timediff << " cmp=" << cmp << dendl;
+
+  ldout(cct, 20) << __func__ << __func__
+                << "(): mtime=" << mtime << " days=" << days
+                << " base_time=" << base_time << " timediff=" << timediff
+                << " cmp=" << cmp
+                << " is_expired=" << (timediff >= cmp) 
+                << dendl;
 
   return (timediff >= cmp);
 }
@@ -366,7 +393,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
       try {
         decode(retention, iter->second);
       } catch (buffer::error& err) {
-        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
+        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention"
+                              << dendl;
         return false;
       }
       if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
@@ -380,7 +408,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
       try {
         decode(obj_legal_hold, iter->second);
       } catch (buffer::error& err) {
-        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl;
+        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold"
+                              << dendl;
         return false;
       }
       if (obj_legal_hold.is_enabled()) {
@@ -471,22 +500,24 @@ public:
     ++obj_iter;
   }
 
-  bool next_has_same_name()
-  {
-    if ((obj_iter + 1) == objs.end()) {
+  boost::optional<std::string> next_key_name() {
+    if (obj_iter == objs.end() ||
+       (obj_iter + 1) == objs.end()) {
       /* this should have been called after get_obj() was called, so this should
        * only happen if is_truncated is false */
-      return false;
+      return boost::none;
     }
-    return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
+
+    return ((obj_iter + 1)->key.name);
   }
+
 }; /* LCObjsLister */
 
 struct op_env {
 
   using LCWorker = RGWLC::LCWorker;
 
-  lc_op& op;
+  lc_op op;
   rgw::sal::RGWRadosStore *store;
   LCWorker* worker;
   RGWBucketInfo& bucket_info;
@@ -499,26 +530,40 @@ struct op_env {
 }; /* op_env */
 
 class LCRuleOp;
+class WorkQ;
 
 struct lc_op_ctx {
   CephContext *cct;
-  op_env& env;
-  rgw_bucket_dir_entry& o;
+  op_env env;
+  rgw_bucket_dir_entry o;
+  boost::optional<std::string> next_key_name;
+  ceph::real_time effective_mtime;
 
   rgw::sal::RGWRadosStore *store;
   RGWBucketInfo& bucket_info;
-  lc_op& op;
+  lc_op& op; // ok--refers to expanded env.op
   LCObjsLister& ol;
 
   rgw_obj obj;
   RGWObjectCtx rctx;
   const DoutPrefixProvider *dpp;
-
-  lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o,
-           const DoutPrefixProvider *_dpp)
-    : cct(_env.store->ctx()), env(_env), o(_o),
+  WorkQ* wq;
+
+  lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
+           boost::optional<std::string> next_key_name,
+           ceph::real_time effective_mtime,
+           const DoutPrefixProvider *dpp, WorkQ* wq)
+    : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
+      effective_mtime(effective_mtime),
       store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
-      obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
+      obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
+    {}
+
+  bool next_has_same_name(const std::string& key_name) {
+    return (next_key_name && key_name.compare(
+             boost::get<std::string>(next_key_name)) == 0);
+  }
+
 }; /* lc_op_ctx */
 
 static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
@@ -548,10 +593,6 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
   del_op.params.obj_owner = obj_owner;
   del_op.params.unmod_since = meta.mtime;
 
-  if (perfcounter) {
-    perfcounter->inc(l_rgw_lc_remove_expired, 1);
-  }
-
   return del_op.delete_obj(null_yield);
 } /* remove_expired_obj */
 
@@ -561,7 +602,7 @@ public:
 
   virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
     return false;
-  };
+  }
 
   /* called after check(). Check should tell us whether this action
    * is applicable. If there are multiple actions, we'll end up executing
@@ -581,6 +622,8 @@ public:
   virtual int process(lc_op_ctx& oc) {
     return 0;
   }
+
+  friend class LCOpRule;
 }; /* LCOpAction */
 
 class LCOpFilter {
@@ -594,49 +637,73 @@ virtual ~LCOpFilter() {}
 class LCOpRule {
   friend class LCOpAction;
 
-  op_env& env;
+  op_env env;
+  boost::optional<std::string> next_key_name;
+  ceph::real_time effective_mtime;
 
-  std::vector<unique_ptr<LCOpFilter> > filters;
-  std::vector<unique_ptr<LCOpAction> > actions;
+  std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
+  std::vector<shared_ptr<LCOpAction> > actions;
 
 public:
   LCOpRule(op_env& _env) : env(_env) {}
 
+  boost::optional<std::string> get_next_key_name() {
+    return next_key_name;
+  }
+
+  std::vector<shared_ptr<LCOpAction>>& get_actions() {
+    return actions;
+  }
+
   void build();
-  int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
+  void update();
+  int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
+             WorkQ* wq);
 }; /* LCOpRule */
 
 using WorkItem =
   boost::variant<void*,
                 /* out-of-line delete */
-                std::tuple<LCOpRule&, rgw_bucket_dir_entry>,
+                std::tuple<LCOpRule, rgw_bucket_dir_entry>,
                 /* uncompleted MPU expiration */
-                std::tuple<const lc_op&, rgw_bucket_dir_entry>,
+                std::tuple<lc_op, rgw_bucket_dir_entry>,
                 rgw_bucket_dir_entry>;
 
 class WorkQ : public Thread
 {
 public:
   using unique_lock = std::unique_lock<std::mutex>;
-  using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&)>;
+  using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
   using dequeue_result = boost::variant<void*, WorkItem>;
 
+  static constexpr uint32_t FLAG_NONE =        0x0000;
+  static constexpr uint32_t FLAG_EWAIT_SYNC =  0x0001;
+  static constexpr uint32_t FLAG_DWAIT_SYNC =  0x0002;
+  static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
+
 private:
-  const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {};
+  const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
   RGWLC::LCWorker* wk;
   uint32_t qmax;
+  int ix;
   std::mutex mtx;
   std::condition_variable cv;
+  uint32_t flags;
   vector<WorkItem> items;
   work_f f;
 
 public:
   WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
-    : wk(wk), qmax(qmax), f(bsf)
+    : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
     {
-      create((string{"workpool_thr_"} + to_string(ix)).c_str());
+      create(thr_name().c_str());
     }
 
+  std::string thr_name() {
+    return std::string{"wp_thrd: "}
+    + std::to_string(wk->ix) + ", " + std::to_string(ix);
+  }
+
   void setf(work_f _f) {
     f = _f;
   }
@@ -645,15 +712,20 @@ public:
     unique_lock uniq(mtx);
     while ((!wk->get_lc()->going_down()) &&
           (items.size() > qmax)) {
+      flags |= FLAG_EWAIT_SYNC;
       cv.wait_for(uniq, 200ms);
     }
     items.push_back(item);
+    if (flags & FLAG_DWAIT_SYNC) {
+      flags &= ~FLAG_DWAIT_SYNC;
+      cv.notify_one();
+    }
   }
 
   void drain() {
     unique_lock uniq(mtx);
-    while ((!wk->get_lc()->going_down()) &&
-          (items.size() > 0)) {
+    flags |= FLAG_EDRAIN_SYNC;
+    while (flags & FLAG_EDRAIN_SYNC) {
       cv.wait_for(uniq, 200ms);
     }
   }
@@ -663,11 +735,20 @@ private:
     unique_lock uniq(mtx);
     while ((!wk->get_lc()->going_down()) &&
           (items.size() == 0)) {
+      /* clear drain state, as we are NOT doing work and qlen==0 */
+      if (flags & FLAG_EDRAIN_SYNC) {
+       flags &= ~FLAG_EDRAIN_SYNC;
+      }
+      flags |= FLAG_DWAIT_SYNC;
       cv.wait_for(uniq, 200ms);
     }
     if (items.size() > 0) {
       auto item = items.back();
       items.pop_back();
+      if (flags & FLAG_EWAIT_SYNC) {
+       flags &= ~FLAG_EWAIT_SYNC;
+       cv.notify_one();
+      }
       return {item};
     }
     return nullptr;
@@ -680,7 +761,7 @@ private:
        /* going down */
        break;
       }
-      f(wk, boost::get<WorkItem>(item));
+      f(wk, this, boost::get<WorkItem>(item));
     }
     return nullptr;
   }
@@ -702,6 +783,12 @@ public:
       ix(0)
     {}
 
+  ~WorkPool() {
+    for (auto& wq : wqs) {
+      wq.join();
+    }
+  }
+
   void setf(WorkQ::work_f _f) {
     for (auto& wq : wqs) {
       wq.setf(_f);
@@ -721,17 +808,22 @@ public:
   }
 }; /* WorkPool */
 
-RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
-                         RGWLC *_lc)
-  : dpp(_dpp), cct(_cct), lc(_lc)
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
+                         RGWLC *lc, int ix)
+  : dpp(dpp), cct(cct), lc(lc), ix(ix)
 {
   auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
   workpool = new WorkPool(this, wpw, 512);
 }
 
+static inline bool worker_should_stop(time_t stop_at, bool once)
+{
+  return !once && stop_at < time(nullptr);
+}
+
 int RGWLC::handle_multipart_expiration(
   RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
-  LCWorker* worker)
+  LCWorker* worker, time_t stop_at, bool once)
 {
   MultipartMetaFilter mp_filter;
   vector<rgw_bucket_dir_entry> objs;
@@ -748,8 +840,8 @@ int RGWLC::handle_multipart_expiration(
   list_op.params.ns = RGW_OBJ_NS_MULTIPART;
   list_op.params.filter = &mp_filter;
 
-  auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) {
-    auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
+  auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
+    auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
     auto& [rule, obj] = wt;
     RGWMPObj mp_obj;
     if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
@@ -758,18 +850,26 @@ int RGWLC::handle_multipart_expiration(
        return;
       }
       RGWObjectCtx rctx(store);
-      ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
-      if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
-       ldpp_dout(wk->get_lc(), 0)
-         << "ERROR: abort_multipart_upload failed, ret=" << ret
-         << ", meta:" << obj.key
-         << dendl;
-      } else if (ret == -ERR_NO_SUCH_UPLOAD) {
-       ldpp_dout(wk->get_lc(), 5)
-         << "ERROR: abort_multipart_upload failed, ret=" << ret
-         << ", meta:" << obj.key
-         << dendl;
-      }
+      int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
+      if (ret == 0) {
+        if (perfcounter) {
+          perfcounter->inc(l_rgw_lc_abort_mpu, 1);
+        }
+      } else {
+       if (ret == -ERR_NO_SUCH_UPLOAD) {
+         ldpp_dout(wk->get_lc(), 5)
+           << "ERROR: abort_multipart_upload failed, ret=" << ret
+           << wq->thr_name()
+           << ", meta:" << obj.key
+           << dendl;
+       } else {
+         ldpp_dout(wk->get_lc(), 0)
+           << "ERROR: abort_multipart_upload failed, ret=" << ret
+           << wq->thr_name()
+           << ", meta:" << obj.key
+           << dendl;
+       }
+      } /* abort failed */
     } /* expired */
   };
 
@@ -777,6 +877,14 @@ int RGWLC::handle_multipart_expiration(
 
   for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
        ++prefix_iter) {
+
+    if (worker_should_stop(stop_at, once)) {
+      ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+                    << worker->ix
+                    << dendl;
+      return 0;
+    }
+
     if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
       continue;
     }
@@ -793,11 +901,10 @@ int RGWLC::handle_multipart_expiration(
       }
 
       for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-       std::tuple<const lc_op&, rgw_bucket_dir_entry> t1 =
+       std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
          {prefix_iter->second, *obj_iter};
        worker->workpool->enqueue(WorkItem{t1});
        if (going_down()) {
-         worker->workpool->drain();
          return 0;
        }
       } /* for objs */
@@ -841,6 +948,8 @@ static inline bool has_all_tags(const lc_op& rule_action,
   for (const auto& tag : object_tags.get_tags()) {
     const auto& rule_tags = rule_action.obj_tags->get_tags();
     const auto& iter = rule_tags.find(tag.first);
+    if(iter == rule_tags.end())
+        continue;
     if(iter->second == tag.second)
     {
       tag_count++;
@@ -862,7 +971,8 @@ static int check_tags(lc_op_ctx& oc, bool *skip)
                            oc.rctx, tags_bl);
     if (ret < 0) {
       if (ret != -ENODATA) {
-        ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
+        ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
+                        << ret << " " << oc.wq->thr_name() << dendl;
       }
       return 0;
     }
@@ -871,12 +981,16 @@ static int check_tags(lc_op_ctx& oc, bool *skip)
       auto iter = tags_bl.cbegin();
       dest_obj_tags.decode(iter);
     } catch (buffer::error& err) {
-      ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+      ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
+                     << oc.wq->thr_name() << dendl;
       return -EIO;
     }
 
     if (! has_all_tags(op, dest_obj_tags)) {
-      ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl;
+      ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj
+                       << " as tags do not match in rule: "
+                       << op.id << " "
+                       << oc.wq->thr_name() << dendl;
       return 0;
     }
   }
@@ -900,7 +1014,9 @@ public:
       if (ret == -ENOENT) {
         return false;
       }
-      ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
+      ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj
+                      << " returned ret=" << ret << " "
+                      << oc.wq->thr_name() << dendl;
       return false;
     }
 
@@ -910,16 +1026,28 @@ public:
 
 class LCOpAction_CurrentExpiration : public LCOpAction {
 public:
+  LCOpAction_CurrentExpiration(op_env& env) {}
+
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
     if (!o.is_current()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": not current, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
     if (o.is_delete_marker()) {
-      if (oc.ol.next_has_same_name()) {
-        return false;
+      std::string nkn;
+      if (oc.next_key_name) nkn = *oc.next_key_name;
+      if (oc.next_has_same_name(o.key.name)) {
+       ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key
+                      << " next_key_name: %%" << nkn << "%% "
+                      << oc.wq->thr_name() << dendl;
+       return false;
       } else {
+       ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key
+                        << " next_key_name: %%" << nkn << "%% "
+                        << oc.wq->thr_name() << dendl;
         *exp_time = real_clock::now();
         return true;
       }
@@ -930,7 +1058,9 @@ public:
     auto& op = oc.op;
     if (op.expiration <= 0) {
       if (op.expiration_date == boost::none) {
-        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
+        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                         << ": no expiration set in rule, skipping "
+                         << oc.wq->thr_name() << dendl;
         return false;
       }
       is_expired = ceph_clock_now() >=
@@ -940,7 +1070,9 @@ public:
       is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
     }
 
-    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl;
+    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+                     << (int)is_expired << " "
+                     << oc.wq->thr_name() << dendl;
     return is_expired;
   }
 
@@ -949,34 +1081,59 @@ public:
     int r;
     if (o.is_delete_marker()) {
       r = remove_expired_obj(oc, true);
+      if (r < 0) {
+       ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
+                        << oc.bucket_info.bucket << ":" << o.key
+                        << " " << cpp_strerror(r) << " "
+                        << oc.wq->thr_name() << dendl;
+      return r;
+      }
+      ldout(oc.cct, 2) << "DELETED: current is-dm "
+                      << oc.bucket_info.bucket << ":" << o.key
+                      << " " << oc.wq->thr_name() << dendl;
     } else {
+      /* ! o.is_delete_marker() */
       r = remove_expired_obj(oc, !oc.bucket_info.versioned());
+      if (r < 0) {
+       ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
+                        << oc.bucket_info.bucket << ":" << o.key
+                        << " " << cpp_strerror(r) << " "
+                        << oc.wq->thr_name() << dendl;
+       return r;
+      }
+      if (perfcounter) {
+        perfcounter->inc(l_rgw_lc_expire_current, 1);
+      }
+      ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                      << " " << oc.wq->thr_name() << dendl;
     }
-    if (r < 0) {
-      ldout(oc.cct, 0) << "ERROR: remove_expired_obj " 
-      << oc.bucket_info.bucket << ":" << o.key 
-      << " " << cpp_strerror(r) << dendl;
-      return r;
-    }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
     return 0;
   }
 };
 
 class LCOpAction_NonCurrentExpiration : public LCOpAction {
+protected:
 public:
+  LCOpAction_NonCurrentExpiration(op_env& env)
+    {}
+
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
     if (o.is_current()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": current version, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
 
-    auto mtime = oc.ol.get_prev_obj().meta.mtime;
     int expiration = oc.op.noncur_expiration;
-    bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
+    bool is_expired = obj_has_expired(oc.cct, oc.effective_mtime, expiration,
+                                     exp_time);
+
+    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+                     << is_expired << " "
+                     << oc.wq->thr_name() << dendl;
 
-    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
     return is_expired &&
       pass_object_lock_check(oc.store->getRados(),
                             oc.bucket_info, oc.obj, oc.rctx);
@@ -987,26 +1144,37 @@ public:
     int r = remove_expired_obj(oc, true);
     if (r < 0) {
       ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) " 
-      << oc.bucket_info.bucket << ":" << o.key 
-      << " " << cpp_strerror(r) << dendl;
+                      << oc.bucket_info.bucket << ":" << o.key 
+                      << " " << cpp_strerror(r)
+                      << " " << oc.wq->thr_name() << dendl;
       return r;
     }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl;
+    if (perfcounter) {
+      perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
+    }
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                    << " (non-current expiration) "
+                    << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
 
 class LCOpAction_DMExpiration : public LCOpAction {
 public:
+  LCOpAction_DMExpiration(op_env& env) {}
+
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
     if (!o.is_delete_marker()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": not a delete marker, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
-
-    if (oc.ol.next_has_same_name()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl;
+    if (oc.next_has_same_name(o.key.name)) {
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": next is same object, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
 
@@ -1020,11 +1188,18 @@ public:
     int r = remove_expired_obj(oc, true);
     if (r < 0) {
       ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
-      << oc.bucket_info.bucket << ":" << o.key
-      << " " << cpp_strerror(r) << dendl;
+                      << oc.bucket_info.bucket << ":" << o.key
+                      << " " << cpp_strerror(r)
+                      << " " << oc.wq->thr_name()
+                      << dendl;
       return r;
     }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl;
+    if (perfcounter) {
+      perfcounter->inc(l_rgw_lc_expire_dm, 1);
+    }
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                    << " (delete marker expiration) "
+                    << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -1055,7 +1230,9 @@ public:
     bool is_expired;
     if (transition.days < 0) {
       if (transition.date == boost::none) {
-        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
+        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                         << ": no transition day/date set in rule, skipping "
+                         << oc.wq->thr_name() << dendl;
         return false;
       }
       is_expired = ceph_clock_now() >=
@@ -1065,7 +1242,9 @@ public:
       is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
     }
 
-    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
+    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+                     << is_expired << " "
+                     << oc.wq->thr_name() << dendl;
 
     need_to_process =
       (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
@@ -1087,9 +1266,11 @@ public:
 
     if (!oc.store->svc()->zone->get_zone_params().
        valid_placement(target_placement)) {
-      ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
+      ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
+                          << target_placement
                            << " bucket="<< oc.bucket_info.bucket
-                           << " rule_id=" << oc.op.id << dendl;
+                           << " rule_id=" << oc.op.id
+                          << " " << oc.wq->thr_name() << dendl;
       return -EINVAL;
     }
 
@@ -1098,12 +1279,16 @@ public:
       o.versioned_epoch, oc.dpp, null_yield);
     if (r < 0) {
       ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " 
-      << oc.bucket_info.bucket << ":" << o.key 
-      << " -> " << transition.storage_class 
-      << " " << cpp_strerror(r) << dendl;
+                          << oc.bucket_info.bucket << ":" << o.key 
+                          << " -> " << transition.storage_class 
+                          << " " << cpp_strerror(r)
+                          << " " << oc.wq->thr_name() << dendl;
       return r;
     }
-    ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl;
+    ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
+                        << ":" << o.key << " -> "
+                        << transition.storage_class
+                        << " " << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -1120,6 +1305,15 @@ protected:
 public:
   LCOpAction_CurrentTransition(const transition_action& _transition)
     : LCOpAction_Transition(_transition) {}
+    int process(lc_op_ctx& oc) {
+      int r = LCOpAction_Transition::process(oc);
+      if (r == 0) {
+        if (perfcounter) {
+          perfcounter->inc(l_rgw_lc_transition_current, 1);
+        }
+      }
+      return r;
+    }
 };
 
 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
@@ -1129,11 +1323,22 @@ protected:
   }
 
   ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
-    return oc.ol.get_prev_obj().meta.mtime;
+    return oc.effective_mtime;
   }
 public:
-  LCOpAction_NonCurrentTransition(const transition_action& _transition)
-    : LCOpAction_Transition(_transition) {}
+  LCOpAction_NonCurrentTransition(op_env& env,
+                                 const transition_action& _transition)
+    : LCOpAction_Transition(_transition)
+    {}
+    int process(lc_op_ctx& oc) {
+      int r = LCOpAction_Transition::process(oc);
+      if (r == 0) {
+        if (perfcounter) {
+          perfcounter->inc(l_rgw_lc_transition_noncurrent, 1);
+        }
+      }
+      return r;
+    }
 };
 
 void LCOpRule::build()
@@ -1144,15 +1349,15 @@ void LCOpRule::build()
 
   if (op.expiration > 0 ||
       op.expiration_date != boost::none) {
-    actions.emplace_back(new LCOpAction_CurrentExpiration);
+    actions.emplace_back(new LCOpAction_CurrentExpiration(env));
   }
 
   if (op.dm_expiration) {
-    actions.emplace_back(new LCOpAction_DMExpiration);
+    actions.emplace_back(new LCOpAction_DMExpiration(env));
   }
 
   if (op.noncur_expiration > 0) {
-    actions.emplace_back(new LCOpAction_NonCurrentExpiration);
+    actions.emplace_back(new LCOpAction_NonCurrentExpiration(env));
   }
 
   for (auto& iter : op.transitions) {
@@ -1160,15 +1365,22 @@ void LCOpRule::build()
   }
 
   for (auto& iter : op.noncur_transitions) {
-    actions.emplace_back(new LCOpAction_NonCurrentTransition(iter.second));
+    actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second));
   }
 }
 
-int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
+void LCOpRule::update()
 {
-  lc_op_ctx ctx(env, o, dpp);
+  next_key_name = env.ol.next_key_name();
+  effective_mtime = env.ol.get_prev_obj().meta.mtime;
+}
 
-  unique_ptr<LCOpAction> *selected = nullptr;
+int LCOpRule::process(rgw_bucket_dir_entry& o,
+                     const DoutPrefixProvider *dpp,
+                     WorkQ* wq)
+{
+  lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq);
+  shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
   real_time exp;
 
   for (auto& a : actions) {
@@ -1203,25 +1415,30 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
     }
 
     if (!cont) {
-      ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl;
+      ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
+                        << ": no rule match, skipping "
+                        << " " << wq->thr_name() << dendl;
       return 0;
     }
 
     int r = (*selected)->process(ctx);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " 
-      << env.bucket_info.bucket << ":" << o.key
-      << " " << cpp_strerror(r) << dendl;
+                       << env.bucket_info.bucket << ":" << o.key
+                       << " " << cpp_strerror(r)
+                       << " " << wq->thr_name() << dendl;
       return r;
     }
-    ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl;
+    ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
+                      << o.key << " " << wq->thr_name() << dendl;
   }
 
   return 0;
 
 }
 
-int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
+                            time_t stop_at, bool once)
 {
   RGWLifecycleConfiguration  config(cct);
   RGWBucketInfo bucket_info;
@@ -1237,13 +1454,22 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
     &bucket_attrs);
   if (ret < 0) {
-    ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
+    ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
+                      << " failed" << dendl;
     return ret;
   }
 
+  auto stack_guard = make_scope_guard(
+    [&worker, &bucket_info]
+      {
+       worker->workpool->drain();
+      }
+    );
+
   if (bucket_info.bucket.marker != bucket_marker) {
-    ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
-                       << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
+    ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
+                      << bucket_tenant << ":" << bucket_name
+                      << " cur_marker=" << bucket_info.bucket.marker
                        << " orig_marker=" << bucket_marker << dendl;
     return -ENOENT;
   }
@@ -1258,21 +1484,24 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
   try {
       config.decode(iter);
     } catch (const buffer::error& e) {
-      ldpp_dout(this, 0) << __func__ <<  "() decode life cycle config failed" << dendl;
+      ldpp_dout(this, 0) << __func__ <<  "() decode life cycle config failed"
+                        << dendl;
       return -1;
     }
 
-  auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) {
+  auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
     auto wt =
-      boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
+      boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
     auto& [op_rule, o] = wt;
+
     ldpp_dout(wk->get_lc(), 20)
-      << __func__ << "(): key=" << o.key << dendl;
-    std::cout << "KEY2: " << o.key << std::endl;
-    int ret = op_rule.process(o, wk->dpp);
+      << __func__ << "(): key=" << o.key << wq->thr_name() 
+      << dendl;
+    int ret = op_rule.process(o, wk->dpp, wq);
     if (ret < 0) {
       ldpp_dout(wk->get_lc(), 20)
        << "ERROR: orule.process() returned ret=" << ret
+       << wq->thr_name() 
        << dendl;
     }
   };
@@ -1287,11 +1516,20 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
   rgw_obj_key next_marker;
   for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
       ++prefix_iter) {
+
+    if (worker_should_stop(stop_at, once)) {
+      ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+                    << worker->ix
+                    << dendl;
+      return 0;
+    }
+
     auto& op = prefix_iter->second;
     if (!is_valid_op(op)) {
       continue;
     }
-    ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
+    ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
+                       << dendl;
     if (prefix_iter != prefix_map.begin() && 
         (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
                                    prev(prefix_iter)->first) == 0)) {
@@ -1314,25 +1552,21 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     op_env oenv(op, store, worker, bucket_info, ol);
     LCOpRule orule(oenv);
     orule.build(); // why can't ctor do it?
-#if 0
-    /* would permit passing o by reference, removes fetch overlap */
-    auto fetch_barrier = [&worker]()
-                          { worker->workpool->drain(); };
-#endif
     rgw_bucket_dir_entry* o{nullptr};
     for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
-      std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
+      orule.update();
+      std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
       worker->workpool->enqueue(WorkItem{t1});
     }
     worker->workpool->drain();
   }
 
-  ret = handle_multipart_expiration(&target, prefix_map, worker);
+  ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once);
   return ret;
 }
 
 int RGWLC::bucket_lc_post(int index, int max_lock_sec,
-                         pair<string, int>& entry, int& result,
+                         cls_rgw_lc_entry& entry, int& result,
                          LCWorker* worker)
 {
   utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
@@ -1341,18 +1575,24 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
   l.set_cookie(cookie);
   l.set_duration(lock_duration);
 
+  dout(5) << "RGWLC::bucket_lc_post(): POST " << entry
+         << " index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
   do {
     int ret = l.lock_exclusive(
       &store->getRados()->lc_pool_ctx, obj_names[index]);
-    if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+    if (ret == -EBUSY || ret == -EEXIST) {
+      /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
-          << obj_names[index] << ", sleep 5, try again" << dendl;
+                        << obj_names[index] << ", sleep 5, try again " << dendl;
       sleep(5);
       continue;
     }
     if (ret < 0)
       return 0;
-    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
+    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
+                       << dendl;
     if (result ==  -ENOENT) {
       ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
                                obj_names[index],  entry);
@@ -1362,9 +1602,9 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
       }
       goto clean;
     } else if (result < 0) {
-      entry.second = lc_failed;
+      entry.status = lc_failed;
     } else {
-      entry.second = lc_complete;
+      entry.status = lc_complete;
     }
 
     ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
@@ -1375,18 +1615,19 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
     }
 clean:
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl;
+    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
+                       << obj_names[index] << dendl;
     return 0;
   } while (true);
 }
 
-int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
-                           map<string, int>* progress_map)
+int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
+                           vector<cls_rgw_lc_entry>& progress_map,
+                           int& index)
 {
-  int index = 0;
-  progress_map->clear();
-  for(; index <max_objs; index++) {
-    map<string, int > entries;
+  progress_map.clear();
+  for(; index < max_objs; index++, marker="") {
+    vector<cls_rgw_lc_entry> entries;
     int ret =
       cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
                      max_entries, entries);
@@ -1399,10 +1640,15 @@ int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
         return ret;
       }
     }
-    map<string, int>::iterator iter;
-    for (iter = entries.begin(); iter != entries.end(); ++iter) {
-      progress_map->insert(*iter);
-    }
+    progress_map.reserve(progress_map.size() + entries.size());
+    progress_map.insert(progress_map.end(), entries.begin(), entries.end());
+
+    /* update index, marker tuple */
+    if (progress_map.size() > 0)
+      marker = progress_map.back().bucket;
+
+    if (progress_map.size() >= max_entries)
+      break;
   }
   return 0;
 }
@@ -1418,7 +1664,7 @@ static inline vector<int> random_sequence(uint32_t n)
   return v;
 }
 
-int RGWLC::process(LCWorker* worker)
+int RGWLC::process(LCWorker* worker, bool once = false)
 {
   int max_secs = cct->_conf->rgw_lc_lock_max_time;
 
@@ -1426,7 +1672,7 @@ int RGWLC::process(LCWorker* worker)
    * that might be running in parallel */
   vector<int> shard_seq = random_sequence(max_objs);
   for (auto index : shard_seq) {
-    int ret = process(index, max_secs, worker);
+    int ret = process(index, max_secs, worker, once);
     if (ret < 0)
       return ret;
   }
@@ -1434,12 +1680,44 @@ int RGWLC::process(LCWorker* worker)
   return 0;
 }
 
-int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
+bool RGWLC::expired_session(time_t started)
 {
+  time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+    ? cct->_conf->rgw_lc_debug_interval
+    : 24*60*60;
+
+  auto now = time(nullptr);
+
+  dout(16) << "RGWLC::expired_session"
+          << " started: " << started
+          << " interval: " << interval << "(*2==" << 2*interval << ")"
+          << " now: " << now
+          << dendl;
+
+  return (started + 2*interval < now);
+}
+
+time_t RGWLC::thread_stop_at()
+{
+  uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+    ? cct->_conf->rgw_lc_debug_interval
+    : 24*60*60;
+
+  return time(nullptr) + interval;
+}
+
+int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
+  bool once = false)
+{
+  dout(5) << "RGWLC::process(): ENTER: "
+         << "index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
   rados::cls::lock::Lock l(lc_index_lock_name);
   do {
     utime_t now = ceph_clock_now();
-    pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
+    //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
+    cls_rgw_lc_entry entry;
     if (max_lock_secs <= 0)
       return -EAGAIN;
 
@@ -1448,7 +1726,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
 
     int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
                               obj_names[index]);
-    if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+    if (ret == -EBUSY || ret == -EEXIST) {
+      /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
           << obj_names[index] << ", sleep 5, try again" << dendl;
       sleep(5);
@@ -1466,7 +1745,28 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
       goto exit;
     }
 
-    if(!if_already_run_today(head.start_date)) {
+    if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
+      ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
+                                obj_names[index], head.marker, entry);
+      if (ret >= 0) {
+       if (entry.status == lc_processing) {
+         if (expired_session(entry.start_time)) {
+           dout(5) << "RGWLC::process(): STALE lc session found for: " << entry
+                   << " index: " << index << " worker ix: " << worker->ix
+                   << " (clearing)"
+                   << dendl;
+         } else {
+           dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
+                   << " index: " << index << " worker ix: " << worker->ix
+                 << dendl;
+           goto exit;
+         }
+       }
+      }
+    }
+
+    if(!if_already_run_today(head.start_date) ||
+       once) {
       head.start_date = now;
       head.marker.clear();
       ret = bucket_lc_prepare(index, worker);
@@ -1488,38 +1788,46 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
     }
 
     /* termination condition (eof) */
-    if (entry.first.empty())
+    if (entry.bucket.empty())
       goto exit;
 
-    entry.second = lc_processing;
+    ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
+           << " index: " << index << " worker ix: " << worker->ix
+           << dendl;
+
+    entry.status = lc_processing;
     ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
-                              obj_names[index],  entry);
+                              obj_names[index], entry);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
-                        << obj_names[index]
-                        << " (" << entry.first << ","
-                        << entry.second << ")"
-                        << dendl;
+             << obj_names[index] << entry.bucket << entry.status << dendl;
       goto exit;
     }
 
-    head.marker = entry.first;
-    ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index],
-                             head);
+    head.marker = entry.bucket;
+    ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx,
+                             obj_names[index],  head);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
                         << obj_names[index]
-                        << dendl;
+             << dendl;
       goto exit;
     }
+
+    ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
+           << " index: " << index << " worker ix: " << worker->ix
+           << dendl;
+
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    ret = bucket_lc_process(entry.first, worker);
+    ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
     bucket_lc_post(index, max_lock_secs, entry, ret, worker);
-  } while(1);
+  } while(1 && !once);
+
+  return 0;
 
 exit:
-    l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    return 0;
+  l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
+  return 0;
 }
 
 void RGWLC::start_processor()
@@ -1528,7 +1836,7 @@ void RGWLC::start_processor()
   workers.reserve(maxw);
   for (int ix = 0; ix < maxw; ++ix) {
     auto worker  =
-      std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this);
+      std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
     worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
     workers.emplace_back(std::move(worker));
   }
@@ -1623,7 +1931,6 @@ int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
 
 RGWLC::LCWorker::~LCWorker()
 {
-  workpool->drain();
   delete workpool;
 } /* ~LCWorker */
 
@@ -1633,12 +1940,13 @@ void RGWLifecycleConfiguration::generate_test_instances(
   o.push_back(new RGWLifecycleConfiguration);
 }
 
-void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
+static inline void get_lc_oid(CephContext *cct,
+                             const string& shard_id, string *oid)
 {
   int max_objs =
     (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
      cct->_conf->rgw_lc_max_objs);
-  /* XXXX oh noes!!! */
+  /* n.b. review hash algo */
   int index = ceph_str_hash_linux(shard_id.c_str(),
                                  shard_id.size()) % HASH_PRIME % max_objs;
   *oid = lc_oid_prefix;
@@ -1653,9 +1961,9 @@ static std::string get_lc_shard_name(const rgw_bucket& bucket){
 }
 
 template<typename F>
-static int guard_lc_modify(
-  rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket,
-  const string& cookie, const F& f) {
+static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
+                          const rgw_bucket& bucket, const string& cookie,
+                          const F& f) {
   CephContext *cct = store->ctx();
 
   string shard_id = get_lc_shard_name(bucket);
@@ -1663,7 +1971,10 @@ static int guard_lc_modify(
   string oid; 
   get_lc_oid(cct, shard_id, &oid);
 
-  pair<string, int> entry(shard_id, lc_uninitial);
+  /* XXX it makes sense to take shard_id for a bucket_id? */
+  cls_rgw_lc_entry entry;
+  entry.bucket = shard_id;
+  entry.status = lc_uninitial;
   int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
 
   rados::cls::lock::Lock l(lc_index_lock_name); 
@@ -1716,9 +2027,10 @@ int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
 
   rgw_bucket& bucket = bucket_info.bucket;
 
+
   ret = guard_lc_modify(store, bucket, cookie,
                        [&](librados::IoCtx *ctx, const string& oid,
-                           const pair<string, int>& entry) {
+                           const cls_rgw_lc_entry& entry) {
     return cls_rgw_lc_set_entry(*ctx, oid, entry);
   });
 
@@ -1745,7 +2057,7 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
 
   ret = guard_lc_modify(store, bucket, cookie,
                        [&](librados::IoCtx *ctx, const string& oid,
-                           const pair<string, int>& entry) {
+                           const cls_rgw_lc_entry& entry) {
     return cls_rgw_lc_rm_entry(*ctx, oid, entry);
   });
 
@@ -1773,7 +2085,7 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
   std::string lc_oid;
   get_lc_oid(store->ctx(), shard_name, &lc_oid);
 
-  rgw_lc_entry_t entry;
+  cls_rgw_lc_entry entry;
   // There are multiple cases we need to encounter here
   // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
   // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
@@ -1797,8 +2109,9 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
 
     ret = guard_lc_modify(
       store, bucket_info.bucket, cookie,
-      [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
-                             const pair<string, int>& entry) {
+      [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx,
+                             const string& oid,
+                             const cls_rgw_lc_entry& entry) {
        return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
       });
 
@@ -1953,4 +2266,70 @@ std::string s3_expiration_header(
 
 } /* rgwlc_s3_expiration_header */
 
+bool s3_multipart_abort_header(
+  DoutPrefixProvider* dpp,
+  const rgw_obj_key& obj_key,
+  const ceph::real_time& mtime,
+  const std::map<std::string, buffer::list>& bucket_attrs,
+  ceph::real_time& abort_date,
+  std::string& rule_id)
+{
+  CephContext* cct = dpp->get_cct();
+  RGWLifecycleConfiguration config(cct);
+
+  const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
+  if (aiter == bucket_attrs.end())
+    return false;
+
+  bufferlist::const_iterator iter{&aiter->second};
+  try {
+    config.decode(iter);
+  } catch (const buffer::error& e) {
+    ldpp_dout(dpp, 0) << __func__
+                      <<  "() decode life cycle config failed"
+                      << dendl;
+    return false;
+  } /* catch */
+
+  std::optional<ceph::real_time> abort_date_tmp;
+  std::optional<std::string_view> rule_id_tmp;
+  const auto& rule_map = config.get_rule_map();
+  for (const auto& ri : rule_map) {
+    const auto& rule = ri.second;
+    const auto& id = rule.get_id();
+    const auto& filter = rule.get_filter();
+    const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix();
+    const auto& mp_expiration = rule.get_mp_expiration();
+    if (!rule.is_enabled()) {
+      continue;
+    }
+    if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) {
+      continue;
+    }
+
+    std::optional<ceph::real_time> rule_abort_date;
+    if (mp_expiration.has_days()) {
+      rule_abort_date = std::optional<ceph::real_time>(
+              mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
+    }
+
+    // update earliest abort date
+    if (rule_abort_date) {
+      if ((! abort_date_tmp) ||
+          (*abort_date_tmp > *rule_abort_date)) {
+        abort_date_tmp =
+                std::optional<ceph::real_time>(rule_abort_date);
+        rule_id_tmp = std::optional<std::string_view>(id);
+      }
+    }
+  }
+  if (abort_date_tmp && rule_id_tmp) {
+    abort_date = *abort_date_tmp;
+    rule_id = *rule_id_tmp;
+    return true;
+  } else {
+    return false;
+  }
+}
+
 } /* namespace rgw::lc */