]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_process.h
0afea681c2c1050271d65f2b096e951970153fa3
[ceph.git] / ceph / src / rgw / rgw_process.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef RGW_PROCESS_H
5 #define RGW_PROCESS_H
6
7 #include "rgw_common.h"
8 #include "rgw_acl.h"
9 #include "rgw_auth_registry.h"
10 #include "rgw_user.h"
11 #include "rgw_op.h"
12 #include "rgw_rest.h"
13
14 #include "include/ceph_assert.h"
15
16 #include "common/WorkQueue.h"
17 #include "common/Throttle.h"
18
19 #include <atomic>
20
21 #if !defined(dout_subsys)
22 #define dout_subsys ceph_subsys_rgw
23 #define def_dout_subsys
24 #endif
25
26 #define dout_context g_ceph_context
27
28 extern void signal_shutdown();
29
30 namespace rgw::dmclock {
31 class Scheduler;
32 }
33
34 struct RGWProcessEnv {
35 rgw::sal::RGWRadosStore *store;
36 RGWREST *rest;
37 OpsLogSocket *olog;
38 int port;
39 std::string uri_prefix;
40 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
41 };
42
43 class RGWFrontendConfig;
44 class RGWRequest;
45
46 class RGWProcess {
47 deque<RGWRequest*> m_req_queue;
48 protected:
49 CephContext *cct;
50 rgw::sal::RGWRadosStore* store;
51 rgw_auth_registry_ptr_t auth_registry;
52 OpsLogSocket* olog;
53 ThreadPool m_tp;
54 Throttle req_throttle;
55 RGWREST* rest;
56 RGWFrontendConfig* conf;
57 int sock_fd;
58 std::string uri_prefix;
59
60 struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
61 RGWProcess* process;
62 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
63 ThreadPool* tp)
64 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
65 tp), process(p) {}
66
67 bool _enqueue(RGWRequest* req) override;
68
69 void _dequeue(RGWRequest* req) override {
70 ceph_abort();
71 }
72
73 bool _empty() override {
74 return process->m_req_queue.empty();
75 }
76
77 RGWRequest* _dequeue() override;
78
79 using ThreadPool::WorkQueue<RGWRequest>::_process;
80
81 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
82
83 void _dump_queue();
84
85 void _clear() override {
86 ceph_assert(process->m_req_queue.empty());
87 }
88 } req_wq;
89
90 public:
91 RGWProcess(CephContext* const cct,
92 RGWProcessEnv* const pe,
93 const int num_threads,
94 RGWFrontendConfig* const conf)
95 : cct(cct),
96 store(pe->store),
97 auth_registry(pe->auth_registry),
98 olog(pe->olog),
99 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
100 req_throttle(cct, "rgw_ops", num_threads * 2),
101 rest(pe->rest),
102 conf(conf),
103 sock_fd(-1),
104 uri_prefix(pe->uri_prefix),
105 req_wq(this,
106 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
107 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
108 &m_tp) {
109 }
110
111 virtual ~RGWProcess() = default;
112
113 virtual void run() = 0;
114 virtual void handle_request(RGWRequest *req) = 0;
115
116 void pause() {
117 m_tp.pause();
118 }
119
120 void unpause_with_new_config(rgw::sal::RGWRadosStore* const store,
121 rgw_auth_registry_ptr_t auth_registry) {
122 this->store = store;
123 this->auth_registry = std::move(auth_registry);
124 m_tp.unpause();
125 }
126
127 void close_fd() {
128 if (sock_fd >= 0) {
129 ::close(sock_fd);
130 sock_fd = -1;
131 }
132 }
133 }; /* RGWProcess */
134
135 class RGWFCGXProcess : public RGWProcess {
136 int max_connections;
137 public:
138
139 /* have a bit more connections than threads so that requests are
140 * still accepted even if we're still processing older requests */
141 RGWFCGXProcess(CephContext* const cct,
142 RGWProcessEnv* const pe,
143 const int num_threads,
144 RGWFrontendConfig* const conf)
145 : RGWProcess(cct, pe, num_threads, conf),
146 max_connections(num_threads + (num_threads >> 3)) {
147 }
148
149 void run() override;
150 void handle_request(RGWRequest* req) override;
151 };
152
153 class RGWProcessControlThread : public Thread {
154 RGWProcess *pprocess;
155 public:
156 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
157
158 void *entry() override {
159 pprocess->run();
160 return NULL;
161 }
162 };
163
164 class RGWLoadGenProcess : public RGWProcess {
165 RGWAccessKey access_key;
166 public:
167 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
168 RGWFrontendConfig* _conf) :
169 RGWProcess(cct, pe, num_threads, _conf) {}
170 void run() override;
171 void checkpoint();
172 void handle_request(RGWRequest* req) override;
173 void gen_request(const string& method, const string& resource,
174 int content_length, std::atomic<bool>* fail_flag);
175
176 void set_access_key(RGWAccessKey& key) { access_key = key; }
177 };
178 /* process stream request */
179 extern int process_request(rgw::sal::RGWRadosStore* store,
180 RGWREST* rest,
181 RGWRequest* req,
182 const std::string& frontend_prefix,
183 const rgw_auth_registry_t& auth_registry,
184 RGWRestfulIO* client_io,
185 OpsLogSocket* olog,
186 optional_yield y,
187 rgw::dmclock::Scheduler *scheduler,
188 std::string* user,
189 ceph::coarse_real_clock::duration* latency,
190 int* http_ret = nullptr);
191
192 extern int rgw_process_authenticated(RGWHandler_REST* handler,
193 RGWOp*& op,
194 RGWRequest* req,
195 req_state* s,
196 optional_yield y,
197 bool skip_retarget = false);
198
199 #if defined(def_dout_subsys)
200 #undef def_dout_subsys
201 #undef dout_subsys
202 #endif
203 #undef dout_context
204
205 #endif /* RGW_PROCESS_H */