X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frgw%2Frgw_lc.h;h=57f02a63e81f61f5046b2ddf015d812a93e1628b;hb=e306af509c4d4816a1f73b17a825ea5186fa0030;hp=32de9581664cd143ebc98b224946a39f48d96ee7;hpb=5164c6f75c1d9ca9e83f6172e1cd48f9ab584acf;p=ceph.git diff --git a/ceph/src/rgw/rgw_lc.h b/ceph/src/rgw/rgw_lc.h index 32de95816..57f02a63e 100644 --- a/ceph/src/rgw/rgw_lc.h +++ b/ceph/src/rgw/rgw_lc.h @@ -458,39 +458,49 @@ class RGWLC : public DoutPrefixProvider { std::atomic down_flag = { false }; string cookie; - class LCWorker : public Thread { +public: + + class WorkPool; + + class LCWorker : public Thread + { const DoutPrefixProvider *dpp; CephContext *cct; RGWLC *lc; ceph::mutex lock = ceph::make_mutex("LCWorker"); ceph::condition_variable cond; + WorkPool* workpool{nullptr}; public: - LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc) {} + LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc); + RGWLC* get_lc() { return lc; } void *entry() override; void stop(); bool should_work(utime_t& now); int schedule_next_start_time(utime_t& start, utime_t& now); - }; - - public: - LCWorker *worker; - RGWLC() : cct(NULL), store(NULL), worker(NULL) {} - ~RGWLC() { - stop_processor(); - finalize(); - } + ~LCWorker(); + + friend class RGWRados; + friend class RGWLC; + }; /* LCWorker */ + + friend class RGWRados; + + std::vector> workers; + + RGWLC() : cct(nullptr), store(nullptr) {} + ~RGWLC(); void initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store); void finalize(); - int process(); - int process(int index, int max_secs); + int process(LCWorker* worker); + int process(int index, int max_secs, LCWorker* worker); bool if_already_run_today(time_t& start_date); int list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map); - int bucket_lc_prepare(int index); - int bucket_lc_process(string& shard_id); - int bucket_lc_post(int index, int max_lock_sec, pair& entry, int& result); + int bucket_lc_prepare(int index, LCWorker* worker); + int bucket_lc_process(string& shard_id, LCWorker* worker); + int bucket_lc_post(int index, int max_lock_sec, pair& entry, int& result, LCWorker* worker); bool going_down(); void start_processor(); void stop_processor(); @@ -507,7 +517,8 @@ class RGWLC : public DoutPrefixProvider { private: int handle_multipart_expiration(RGWRados::Bucket *target, - const multimap& prefix_map); + const multimap& prefix_map, + LCWorker* worker); }; namespace rgw::lc {