]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_process.h
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_process.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef RGW_PROCESS_H
5 #define RGW_PROCESS_H
6
7 #include "rgw_common.h"
8 #include "rgw_rados.h"
9 #include "rgw_acl.h"
10 #include "rgw_auth_registry.h"
11 #include "rgw_user.h"
12 #include "rgw_op.h"
13 #include "rgw_rest.h"
14
15 #include "include/assert.h"
16
17 #include "common/WorkQueue.h"
18 #include "common/Throttle.h"
19
20 #include <atomic>
21
22 #if !defined(dout_subsys)
23 #define dout_subsys ceph_subsys_rgw
24 #define def_dout_subsys
25 #endif
26
27 #define dout_context g_ceph_context
28
29 extern void signal_shutdown();
30
31 struct RGWProcessEnv {
32 RGWRados *store;
33 RGWREST *rest;
34 OpsLogSocket *olog;
35 int port;
36 std::string uri_prefix;
37 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
38 };
39
40 class RGWFrontendConfig;
41
42 class RGWProcess {
43 deque<RGWRequest*> m_req_queue;
44 protected:
45 CephContext *cct;
46 RGWRados* store;
47 rgw_auth_registry_ptr_t auth_registry;
48 OpsLogSocket* olog;
49 ThreadPool m_tp;
50 Throttle req_throttle;
51 RGWREST* rest;
52 RGWFrontendConfig* conf;
53 int sock_fd;
54 std::string uri_prefix;
55
56 struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
57 RGWProcess* process;
58 RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
59 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
60 tp), process(p) {}
61
62 bool _enqueue(RGWRequest* req) override {
63 process->m_req_queue.push_back(req);
64 perfcounter->inc(l_rgw_qlen);
65 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
66 _dump_queue();
67 return true;
68 }
69
70 void _dequeue(RGWRequest* req) override {
71 ceph_abort();
72 }
73
74 bool _empty() override {
75 return process->m_req_queue.empty();
76 }
77
78 RGWRequest* _dequeue() override {
79 if (process->m_req_queue.empty())
80 return NULL;
81 RGWRequest *req = process->m_req_queue.front();
82 process->m_req_queue.pop_front();
83 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
84 _dump_queue();
85 perfcounter->inc(l_rgw_qlen, -1);
86 return req;
87 }
88
89 using ThreadPool::WorkQueue<RGWRequest>::_process;
90
91 void _process(RGWRequest *req, ThreadPool::TPHandle &) override {
92 perfcounter->inc(l_rgw_qactive);
93 process->handle_request(req);
94 process->req_throttle.put(1);
95 perfcounter->inc(l_rgw_qactive, -1);
96 }
97
98 void _dump_queue();
99
100 void _clear() override {
101 assert(process->m_req_queue.empty());
102 }
103 } req_wq;
104
105 public:
106 RGWProcess(CephContext* const cct,
107 RGWProcessEnv* const pe,
108 const int num_threads,
109 RGWFrontendConfig* const conf)
110 : cct(cct),
111 store(pe->store),
112 auth_registry(pe->auth_registry),
113 olog(pe->olog),
114 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
115 req_throttle(cct, "rgw_ops", num_threads * 2),
116 rest(pe->rest),
117 conf(conf),
118 sock_fd(-1),
119 uri_prefix(pe->uri_prefix),
120 req_wq(this, g_conf->rgw_op_thread_timeout,
121 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
122 }
123
124 virtual ~RGWProcess() = default;
125
126 virtual void run() = 0;
127 virtual void handle_request(RGWRequest *req) = 0;
128
129 void pause() {
130 m_tp.pause();
131 }
132
133 void unpause_with_new_config(RGWRados* const store,
134 rgw_auth_registry_ptr_t auth_registry) {
135 this->store = store;
136 this->auth_registry = std::move(auth_registry);
137 m_tp.unpause();
138 }
139
140 void close_fd() {
141 if (sock_fd >= 0) {
142 ::close(sock_fd);
143 sock_fd = -1;
144 }
145 }
146 }; /* RGWProcess */
147
148 class RGWFCGXProcess : public RGWProcess {
149 int max_connections;
150 public:
151
152 /* have a bit more connections than threads so that requests are
153 * still accepted even if we're still processing older requests */
154 RGWFCGXProcess(CephContext* const cct,
155 RGWProcessEnv* const pe,
156 const int num_threads,
157 RGWFrontendConfig* const conf)
158 : RGWProcess(cct, pe, num_threads, conf),
159 max_connections(num_threads + (num_threads >> 3)) {
160 }
161
162 void run() override;
163 void handle_request(RGWRequest* req) override;
164 };
165
166 class RGWProcessControlThread : public Thread {
167 RGWProcess *pprocess;
168 public:
169 RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
170
171 void *entry() override {
172 pprocess->run();
173 return NULL;
174 }
175 };
176
177 class RGWLoadGenProcess : public RGWProcess {
178 RGWAccessKey access_key;
179 public:
180 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
181 RGWFrontendConfig* _conf) :
182 RGWProcess(cct, pe, num_threads, _conf) {}
183 void run() override;
184 void checkpoint();
185 void handle_request(RGWRequest* req) override;
186 void gen_request(const string& method, const string& resource,
187 int content_length, std::atomic<bool>* fail_flag);
188
189 void set_access_key(RGWAccessKey& key) { access_key = key; }
190 };
191
192 /* process stream request */
193 extern int process_request(RGWRados* store,
194 RGWREST* rest,
195 RGWRequest* req,
196 const std::string& frontend_prefix,
197 const rgw_auth_registry_t& auth_registry,
198 RGWRestfulIO* client_io,
199 OpsLogSocket* olog);
200
201 extern int rgw_process_authenticated(RGWHandler_REST* handler,
202 RGWOp*& op,
203 RGWRequest* req,
204 req_state* s,
205 bool skip_retarget = false);
206
207 #if defined(def_dout_subsys)
208 #undef def_dout_subsys
209 #undef dout_subsys
210 #endif
211 #undef dout_context
212
213 #endif /* RGW_PROCESS_H */