]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_frontend.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_frontend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef RGW_FRONTEND_H
5 #define RGW_FRONTEND_H
6
7 #include <map>
8 #include <string>
9
10 #include "rgw_request.h"
11 #include "rgw_process.h"
12 #include "rgw_realm_reloader.h"
13
14 #include "rgw_civetweb.h"
15 #include "rgw_civetweb_log.h"
16 #include "civetweb/civetweb.h"
17 #include "rgw_auth_registry.h"
18
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rgw
21
22 namespace rgw::dmclock {
23 class SyncScheduler;
24 class ClientConfig;
25 class SchedulerCtx;
26 }
27
28 class RGWFrontendConfig {
29 std::string config;
30 std::multimap<std::string, std::string> config_map;
31 std::string framework;
32
33 int parse_config(const std::string& config,
34 std::multimap<std::string, std::string>& config_map);
35
36 public:
37 explicit RGWFrontendConfig(const std::string& config)
38 : config(config) {
39 }
40
41 int init() {
42 const int ret = parse_config(config, config_map);
43 return ret < 0 ? ret : 0;
44 }
45
46 void set_default_config(RGWFrontendConfig& def_conf);
47
48 std::optional<string> get_val(const std::string& key);
49
50 bool get_val(const std::string& key,
51 const std::string& def_val,
52 std::string* out);
53 bool get_val(const std::string& key, int def_val, int *out);
54
55 std::string get_val(const std::string& key,
56 const std::string& def_val) {
57 std::string out;
58 get_val(key, def_val, &out);
59 return out;
60 }
61
62 const std::string& get_config() {
63 return config;
64 }
65
66 std::multimap<std::string, std::string>& get_config_map() {
67 return config_map;
68 }
69
70 std::string get_framework() const {
71 return framework;
72 }
73 };
74
75 class RGWFrontend {
76 public:
77 virtual ~RGWFrontend() {}
78
79 virtual int init() = 0;
80
81 virtual int run() = 0;
82 virtual void stop() = 0;
83 virtual void join() = 0;
84
85 virtual void pause_for_new_config() = 0;
86 virtual void unpause_with_new_config(rgw::sal::RGWRadosStore* store,
87 rgw_auth_registry_ptr_t auth_registry) = 0;
88 };
89
90
91 struct RGWMongooseEnv : public RGWProcessEnv {
92 // every request holds a read lock, so we need to prioritize write locks to
93 // avoid starving pause_for_new_config()
94 static constexpr bool prioritize_write = true;
95 RWLock mutex;
96
97 explicit RGWMongooseEnv(const RGWProcessEnv &env)
98 : RGWProcessEnv(env),
99 mutex("RGWCivetWebFrontend", false, true, prioritize_write) {
100 }
101 };
102
103
104 class RGWCivetWebFrontend : public RGWFrontend {
105 RGWFrontendConfig* conf;
106 struct mg_context* ctx;
107 RGWMongooseEnv env;
108
109 std::unique_ptr<rgw::dmclock::SyncScheduler> scheduler;
110 std::unique_ptr<rgw::dmclock::ClientConfig> client_config;
111
112 void set_conf_default(std::multimap<std::string, std::string>& m,
113 const std::string& key,
114 const std::string& def_val) {
115 if (m.find(key) == std::end(m)) {
116 m.emplace(key, def_val);
117 }
118 }
119
120 CephContext* cct() const { return env.store->ctx(); }
121 public:
122 RGWCivetWebFrontend(RGWProcessEnv& env,
123 RGWFrontendConfig *conf,
124 rgw::dmclock::SchedulerCtx& sched_ctx);
125
126 int init() override {
127 return 0;
128 }
129
130 int run() override;
131
132 int process(struct mg_connection* conn);
133
134 void stop() override {
135 if (ctx) {
136 mg_stop(ctx);
137 }
138 }
139
140 void join() override {
141 return;
142 }
143
144 void pause_for_new_config() override {
145 // block callbacks until unpause
146 env.mutex.get_write();
147 }
148
149 void unpause_with_new_config(rgw::sal::RGWRadosStore* const store,
150 rgw_auth_registry_ptr_t auth_registry) override {
151 env.store = store;
152 env.auth_registry = std::move(auth_registry);
153 // unpause callbacks
154 env.mutex.put_write();
155 }
156 }; /* RGWCivetWebFrontend */
157
158 class RGWProcessFrontend : public RGWFrontend {
159 protected:
160 RGWFrontendConfig* conf;
161 RGWProcess* pprocess;
162 RGWProcessEnv env;
163 RGWProcessControlThread* thread;
164
165 public:
166 RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
167 : conf(_conf), pprocess(nullptr), env(pe), thread(nullptr) {
168 }
169
170 ~RGWProcessFrontend() override {
171 delete thread;
172 delete pprocess;
173 }
174
175 int run() override {
176 ceph_assert(pprocess); /* should have initialized by init() */
177 thread = new RGWProcessControlThread(pprocess);
178 thread->create("rgw_frontend");
179 return 0;
180 }
181
182 void stop() override;
183
184 void join() override {
185 thread->join();
186 }
187
188 void pause_for_new_config() override {
189 pprocess->pause();
190 }
191
192 void unpause_with_new_config(rgw::sal::RGWRadosStore* const store,
193 rgw_auth_registry_ptr_t auth_registry) override {
194 env.store = store;
195 env.auth_registry = auth_registry;
196 pprocess->unpause_with_new_config(store, std::move(auth_registry));
197 }
198 }; /* RGWProcessFrontend */
199
200 class RGWFCGXFrontend : public RGWProcessFrontend {
201 public:
202 RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
203 : RGWProcessFrontend(pe, _conf) {}
204
205 int init() override {
206 pprocess = new RGWFCGXProcess(g_ceph_context, &env,
207 g_conf()->rgw_thread_pool_size, conf);
208 return 0;
209 }
210 }; /* RGWFCGXFrontend */
211
212 class RGWLoadGenFrontend : public RGWProcessFrontend {
213 public:
214 RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf)
215 : RGWProcessFrontend(pe, _conf) {}
216
217 int init() override {
218 int num_threads;
219 conf->get_val("num_threads", g_conf()->rgw_thread_pool_size, &num_threads);
220 RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env,
221 num_threads, conf);
222
223 pprocess = pp;
224
225 string uid_str;
226 conf->get_val("uid", "", &uid_str);
227 if (uid_str.empty()) {
228 derr << "ERROR: uid param must be specified for loadgen frontend"
229 << dendl;
230 return -EINVAL;
231 }
232
233 rgw_user uid(uid_str);
234
235 RGWUserInfo user_info;
236 int ret = env.store->ctl()->user->get_info_by_uid(uid, &user_info, null_yield);
237 if (ret < 0) {
238 derr << "ERROR: failed reading user info: uid=" << uid << " ret="
239 << ret << dendl;
240 return ret;
241 }
242
243 map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
244 if (aiter == user_info.access_keys.end()) {
245 derr << "ERROR: user has no S3 access keys set" << dendl;
246 return -EINVAL;
247 }
248
249 pp->set_access_key(aiter->second);
250
251 return 0;
252 }
253 }; /* RGWLoadGenFrontend */
254
255 // FrontendPauser implementation for RGWRealmReloader
256 class RGWFrontendPauser : public RGWRealmReloader::Pauser {
257 std::list<RGWFrontend*> &frontends;
258 RGWRealmReloader::Pauser* pauser;
259 rgw::auth::ImplicitTenants& implicit_tenants;
260
261 public:
262 RGWFrontendPauser(std::list<RGWFrontend*> &frontends,
263 rgw::auth::ImplicitTenants& implicit_tenants,
264 RGWRealmReloader::Pauser* pauser = nullptr)
265 : frontends(frontends),
266 pauser(pauser),
267 implicit_tenants(implicit_tenants) {
268 }
269
270 void pause() override {
271 for (auto frontend : frontends)
272 frontend->pause_for_new_config();
273 if (pauser)
274 pauser->pause();
275 }
276 void resume(rgw::sal::RGWRadosStore *store) override {
277 /* Initialize the registry of auth strategies which will coordinate
278 * the dynamic reconfiguration. */
279 auto auth_registry = \
280 rgw::auth::StrategyRegistry::create(g_ceph_context, implicit_tenants, store->getRados()->pctl);
281
282 for (auto frontend : frontends)
283 frontend->unpause_with_new_config(store, auth_registry);
284 if (pauser)
285 pauser->resume(store);
286 }
287 };
288
289 #endif /* RGW_FRONTEND_H */