std::atomic<bool> down_flag = { false };
string cookie;
- class LCWorker : public Thread {
+public:
+
+ class WorkPool;
+
+ class LCWorker : public Thread
+ {
const DoutPrefixProvider *dpp;
CephContext *cct;
RGWLC *lc;
ceph::mutex lock = ceph::make_mutex("LCWorker");
ceph::condition_variable cond;
+ WorkPool* workpool{nullptr};
public:
- LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc) {}
+ LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc);
+ RGWLC* get_lc() { return lc; }
void *entry() override;
void stop();
bool should_work(utime_t& now);
int schedule_next_start_time(utime_t& start, utime_t& now);
- };
-
- public:
- LCWorker *worker;
- RGWLC() : cct(NULL), store(NULL), worker(NULL) {}
- ~RGWLC() {
- stop_processor();
- finalize();
- }
+ ~LCWorker();
+
+ friend class RGWRados;
+ friend class RGWLC;
+ }; /* LCWorker */
+
+ friend class RGWRados;
+
+ std::vector<std::unique_ptr<RGWLC::LCWorker>> workers;
+
+ RGWLC() : cct(nullptr), store(nullptr) {}
+ ~RGWLC();
void initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store);
void finalize();
- int process();
- int process(int index, int max_secs);
+ int process(LCWorker* worker);
+ int process(int index, int max_secs, LCWorker* worker);
bool if_already_run_today(time_t& start_date);
int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map);
- int bucket_lc_prepare(int index);
- int bucket_lc_process(string& shard_id);
- int bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result);
+ int bucket_lc_prepare(int index, LCWorker* worker);
+ int bucket_lc_process(string& shard_id, LCWorker* worker);
+ int bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result, LCWorker* worker);
bool going_down();
void start_processor();
void stop_processor();
private:
int handle_multipart_expiration(RGWRados::Bucket *target,
- const multimap<string, lc_op>& prefix_map);
+ const multimap<string, lc_op>& prefix_map,
+ LCWorker* worker);
};
namespace rgw::lc {