]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_process.h
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_process.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef RGW_PROCESS_H
5 #define RGW_PROCESS_H
6
7 #include "rgw_common.h"
8 #include "rgw_acl.h"
9 #include "rgw_auth_registry.h"
10 #include "rgw_user.h"
11 #include "rgw_op.h"
12 #include "rgw_rest.h"
13
14 #include "include/ceph_assert.h"
15
16 #include "common/WorkQueue.h"
17 #include "common/Throttle.h"
18
19 #include <atomic>
20
21 #if !defined(dout_subsys)
22 #define dout_subsys ceph_subsys_rgw
23 #define def_dout_subsys
24 #endif
25
26 #define dout_context g_ceph_context
27
28 extern void signal_shutdown();
29
30 namespace rgw::dmclock {
31 class Scheduler;
32 }
33
34 struct RGWProcessEnv {
35 rgw::sal::RGWRadosStore *store;
36 RGWREST *rest;
37 OpsLogSocket *olog;
38 int port;
39 std::string uri_prefix;
40 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
41 };
42
43 class RGWFrontendConfig;
44 class RGWRequest;
45
46 class RGWProcess {
47 deque<RGWRequest*> m_req_queue;
48 protected:
49 CephContext *cct;
50 rgw::sal::RGWRadosStore* store;
51 rgw_auth_registry_ptr_t auth_registry;
52 OpsLogSocket* olog;
53 ThreadPool m_tp;
54 Throttle req_throttle;
55 RGWREST* rest;
56 RGWFrontendConfig* conf;
57 int sock_fd;
58 std::string uri_prefix;
59
60 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
61 RGWProcess* process;
62 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
63 ThreadPool* tp)
64 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
65 tp), process(p) {}
66
67 bool _enqueue(RGWRequest* req) override;
68
69 void _dequeue(RGWRequest* req) override {
70 ceph_abort();
71 }
72
73 bool _empty() override {
74 return process->m_req_queue.empty();
75 }
76
77 RGWRequest* _dequeue() override;
78
79 using ThreadPool::WorkQueue<RGWRequest>::_process;
80
81 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
82
83 void _dump_queue();
84
85 void _clear() override {
86 ceph_assert(process->m_req_queue.empty());
87 }
88
89 CephContext *get_cct() const override { return process->cct; }
90 unsigned get_subsys() const { return ceph_subsys_rgw; }
91 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";}
92
93 } req_wq;
94
95 public:
96 RGWProcess(CephContext* const cct,
97 RGWProcessEnv* const pe,
98 const int num_threads,
99 RGWFrontendConfig* const conf)
100 : cct(cct),
101 store(pe->store),
102 auth_registry(pe->auth_registry),
103 olog(pe->olog),
104 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
105 req_throttle(cct, "rgw_ops", num_threads * 2),
106 rest(pe->rest),
107 conf(conf),
108 sock_fd(-1),
109 uri_prefix(pe->uri_prefix),
110 req_wq(this,
111 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
112 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
113 &m_tp) {
114 }
115
116 virtual ~RGWProcess() = default;
117
118 virtual void run() = 0;
119 virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0;
120
121 void pause() {
122 m_tp.pause();
123 }
124
125 void unpause_with_new_config(rgw::sal::RGWRadosStore* const store,
126 rgw_auth_registry_ptr_t auth_registry) {
127 this->store = store;
128 this->auth_registry = std::move(auth_registry);
129 m_tp.unpause();
130 }
131
132 void close_fd() {
133 if (sock_fd >= 0) {
134 ::close(sock_fd);
135 sock_fd = -1;
136 }
137 }
138 }; /* RGWProcess */
139
140 class RGWFCGXProcess : public RGWProcess {
141 int max_connections;
142 public:
143
144 /* have a bit more connections than threads so that requests are
145 * still accepted even if we're still processing older requests */
146 RGWFCGXProcess(CephContext* const cct,
147 RGWProcessEnv* const pe,
148 const int num_threads,
149 RGWFrontendConfig* const conf)
150 : RGWProcess(cct, pe, num_threads, conf),
151 max_connections(num_threads + (num_threads >> 3)) {
152 }
153
154 void run() override;
155 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
156 };
157
158 class RGWProcessControlThread : public Thread {
159 RGWProcess *pprocess;
160 public:
161 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
162
163 void *entry() override {
164 pprocess->run();
165 return NULL;
166 }
167 };
168
169 class RGWLoadGenProcess : public RGWProcess {
170 RGWAccessKey access_key;
171 public:
172 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
173 RGWFrontendConfig* _conf) :
174 RGWProcess(cct, pe, num_threads, _conf) {}
175 void run() override;
176 void checkpoint();
177 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
178 void gen_request(const string& method, const string& resource,
179 int content_length, std::atomic<bool>* fail_flag);
180
181 void set_access_key(RGWAccessKey& key) { access_key = key; }
182 };
183 /* process stream request */
184 extern int process_request(rgw::sal::RGWRadosStore* store,
185 RGWREST* rest,
186 RGWRequest* req,
187 const std::string& frontend_prefix,
188 const rgw_auth_registry_t& auth_registry,
189 RGWRestfulIO* client_io,
190 OpsLogSocket* olog,
191 optional_yield y,
192 rgw::dmclock::Scheduler *scheduler,
193 std::string* user,
194 ceph::coarse_real_clock::duration* latency,
195 int* http_ret = nullptr);
196
197 extern int rgw_process_authenticated(RGWHandler_REST* handler,
198 RGWOp*& op,
199 RGWRequest* req,
200 req_state* s,
201 optional_yield y,
202 bool skip_retarget = false);
203
204 #if defined(def_dout_subsys)
205 #undef def_dout_subsys
206 #undef dout_subsys
207 #endif
208 #undef dout_context
209
210 #endif /* RGW_PROCESS_H */