]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_frontend.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_frontend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #include <map>
7 #include <string>
8 #include <vector>
9
10 #include "common/RWLock.h"
11
12 #include "rgw_request.h"
13 #include "rgw_process.h"
14 #include "rgw_process_env.h"
15 #include "rgw_realm_reloader.h"
16
17 #include "rgw_auth_registry.h"
18 #include "rgw_sal_rados.h"
19
20 #define dout_context g_ceph_context
21
22 namespace rgw::dmclock {
23 class SyncScheduler;
24 class ClientConfig;
25 class SchedulerCtx;
26 }
27
28 class RGWFrontendConfig {
29 std::string config;
30 std::multimap<std::string, std::string> config_map;
31 std::string framework;
32
33 int parse_config(const std::string& config,
34 std::multimap<std::string, std::string>& config_map);
35
36 public:
37 explicit RGWFrontendConfig(const std::string& config)
38 : config(config) {
39 }
40
41 int init() {
42 const int ret = parse_config(config, config_map);
43 return ret < 0 ? ret : 0;
44 }
45
46 void set_default_config(RGWFrontendConfig& def_conf);
47
48 std::optional<std::string> get_val(const std::string& key);
49
50 bool get_val(const std::string& key,
51 const std::string& def_val,
52 std::string* out);
53 bool get_val(const std::string& key, int def_val, int *out);
54
55 std::string get_val(const std::string& key,
56 const std::string& def_val) {
57 std::string out;
58 get_val(key, def_val, &out);
59 return out;
60 }
61
62 const std::string& get_config() {
63 return config;
64 }
65
66 std::multimap<std::string, std::string>& get_config_map() {
67 return config_map;
68 }
69
70 std::string get_framework() const {
71 return framework;
72 }
73 };
74
75 class RGWFrontend {
76 public:
77 virtual ~RGWFrontend() {}
78
79 virtual int init() = 0;
80
81 virtual int run() = 0;
82 virtual void stop() = 0;
83 virtual void join() = 0;
84
85 virtual void pause_for_new_config() = 0;
86 virtual void unpause_with_new_config() = 0;
87 };
88
89
90 class RGWProcessFrontend : public RGWFrontend {
91 protected:
92 RGWFrontendConfig* conf;
93 RGWProcess* pprocess;
94 RGWProcessEnv& env;
95 RGWProcessControlThread* thread;
96
97 public:
98 RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
99 : conf(_conf), pprocess(nullptr), env(pe), thread(nullptr) {
100 }
101
102 ~RGWProcessFrontend() override {
103 delete thread;
104 delete pprocess;
105 }
106
107 int run() override {
108 ceph_assert(pprocess); /* should have initialized by init() */
109 thread = new RGWProcessControlThread(pprocess);
110 thread->create("rgw_frontend");
111 return 0;
112 }
113
114 void stop() override;
115
116 void join() override {
117 thread->join();
118 }
119
120 void pause_for_new_config() override {
121 pprocess->pause();
122 }
123
124 void unpause_with_new_config() override {
125 pprocess->unpause_with_new_config();
126 }
127 }; /* RGWProcessFrontend */
128
129 class RGWLoadGenFrontend : public RGWProcessFrontend, public DoutPrefixProvider {
130 public:
131 RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf)
132 : RGWProcessFrontend(pe, _conf) {}
133
134 CephContext *get_cct() const {
135 return env.driver->ctx();
136 }
137
138 unsigned get_subsys() const
139 {
140 return ceph_subsys_rgw;
141 }
142
143 std::ostream& gen_prefix(std::ostream& out) const
144 {
145 return out << "rgw loadgen frontend: ";
146 }
147
148 int init() override {
149 int num_threads;
150 conf->get_val("num_threads", g_conf()->rgw_thread_pool_size, &num_threads);
151 std::string uri_prefix;
152 conf->get_val("prefix", "", &uri_prefix);
153
154 RGWLoadGenProcess *pp = new RGWLoadGenProcess(
155 g_ceph_context, env, num_threads, std::move(uri_prefix), conf);
156
157 pprocess = pp;
158
159 std::string uid_str;
160 conf->get_val("uid", "", &uid_str);
161 if (uid_str.empty()) {
162 derr << "ERROR: uid param must be specified for loadgen frontend"
163 << dendl;
164 return -EINVAL;
165 }
166
167 rgw_user uid(uid_str);
168 std::unique_ptr<rgw::sal::User> user = env.driver->get_user(uid);
169
170 int ret = user->load_user(this, null_yield);
171 if (ret < 0) {
172 derr << "ERROR: failed reading user info: uid=" << uid << " ret="
173 << ret << dendl;
174 return ret;
175 }
176
177 auto aiter = user->get_info().access_keys.begin();
178 if (aiter == user->get_info().access_keys.end()) {
179 derr << "ERROR: user has no S3 access keys set" << dendl;
180 return -EINVAL;
181 }
182
183 pp->set_access_key(aiter->second);
184
185 return 0;
186 }
187 }; /* RGWLoadGenFrontend */
188
189 // FrontendPauser implementation for RGWRealmReloader
190 class RGWFrontendPauser : public RGWRealmReloader::Pauser {
191 std::vector<RGWFrontend*> &frontends;
192 RGWRealmReloader::Pauser* pauser;
193
194 public:
195 RGWFrontendPauser(std::vector<RGWFrontend*> &frontends,
196 RGWRealmReloader::Pauser* pauser = nullptr)
197 : frontends(frontends), pauser(pauser) {}
198
199 void pause() override {
200 for (auto frontend : frontends)
201 frontend->pause_for_new_config();
202 if (pauser)
203 pauser->pause();
204 }
205 void resume(rgw::sal::Driver* driver) override {
206 for (auto frontend : frontends)
207 frontend->unpause_with_new_config();
208 if (pauser)
209 pauser->resume(driver);
210 }
211 };