]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_process.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_process.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef RGW_PROCESS_H
5 #define RGW_PROCESS_H
6
7 #include "rgw_common.h"
8 #include "rgw_acl.h"
9 #include "rgw_auth_registry.h"
10 #include "rgw_user.h"
11 #include "rgw_op.h"
12 #include "rgw_rest.h"
13 #include "rgw_ratelimit.h"
14 #include "include/ceph_assert.h"
15
16 #include "common/WorkQueue.h"
17 #include "common/Throttle.h"
18
19 #include <atomic>
20
21 #if !defined(dout_subsys)
22 #define dout_subsys ceph_subsys_rgw
23 #define def_dout_subsys
24 #endif
25
26 #define dout_context g_ceph_context
27
28 extern void signal_shutdown();
29
30 namespace rgw::dmclock {
31 class Scheduler;
32 }
33
34 struct RGWProcessEnv {
35 rgw::sal::Store* store;
36 RGWREST *rest;
37 OpsLogSink *olog;
38 int port;
39 std::string uri_prefix;
40 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
41 //maybe there is a better place to store the rate limit data structure
42 ActiveRateLimiter* ratelimiting;
43 };
44
45 class RGWFrontendConfig;
46 class RGWRequest;
47
48 class RGWProcess {
49 std::deque<RGWRequest*> m_req_queue;
50 protected:
51 CephContext *cct;
52 rgw::sal::Store* store;
53 rgw_auth_registry_ptr_t auth_registry;
54 OpsLogSink* olog;
55 ThreadPool m_tp;
56 Throttle req_throttle;
57 RGWREST* rest;
58 RGWFrontendConfig* conf;
59 int sock_fd;
60 std::string uri_prefix;
61
62 struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
63 RGWProcess* process;
64 RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
65 ThreadPool* tp)
66 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
67 tp), process(p) {}
68
69 bool _enqueue(RGWRequest* req) override;
70
71 void _dequeue(RGWRequest* req) override {
72 ceph_abort();
73 }
74
75 bool _empty() override {
76 return process->m_req_queue.empty();
77 }
78
79 RGWRequest* _dequeue() override;
80
81 using ThreadPool::WorkQueue<RGWRequest>::_process;
82
83 void _process(RGWRequest *req, ThreadPool::TPHandle &) override;
84
85 void _dump_queue();
86
87 void _clear() override {
88 ceph_assert(process->m_req_queue.empty());
89 }
90
91 CephContext *get_cct() const override { return process->cct; }
92 unsigned get_subsys() const { return ceph_subsys_rgw; }
93 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";}
94
95 } req_wq;
96
97 public:
98 RGWProcess(CephContext* const cct,
99 RGWProcessEnv* const pe,
100 const int num_threads,
101 RGWFrontendConfig* const conf)
102 : cct(cct),
103 store(pe->store),
104 auth_registry(pe->auth_registry),
105 olog(pe->olog),
106 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
107 req_throttle(cct, "rgw_ops", num_threads * 2),
108 rest(pe->rest),
109 conf(conf),
110 sock_fd(-1),
111 uri_prefix(pe->uri_prefix),
112 req_wq(this,
113 ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
114 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
115 &m_tp) {
116 }
117
118 virtual ~RGWProcess() = default;
119
120 virtual void run() = 0;
121 virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0;
122
123 void pause() {
124 m_tp.pause();
125 }
126
127 void unpause_with_new_config(rgw::sal::Store* const store,
128 rgw_auth_registry_ptr_t auth_registry) {
129 this->store = store;
130 this->auth_registry = std::move(auth_registry);
131 m_tp.unpause();
132 }
133
134 void close_fd() {
135 if (sock_fd >= 0) {
136 ::close(sock_fd);
137 sock_fd = -1;
138 }
139 }
140 }; /* RGWProcess */
141
142 class RGWProcessControlThread : public Thread {
143 RGWProcess *pprocess;
144 public:
145 explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
146
147 void *entry() override {
148 pprocess->run();
149 return NULL;
150 }
151 };
152
153 class RGWLoadGenProcess : public RGWProcess {
154 RGWAccessKey access_key;
155 public:
156 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
157 RGWFrontendConfig* _conf) :
158 RGWProcess(cct, pe, num_threads, _conf) {}
159 void run() override;
160 void checkpoint();
161 void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;
162 void gen_request(const std::string& method, const std::string& resource,
163 int content_length, std::atomic<bool>* fail_flag);
164
165 void set_access_key(RGWAccessKey& key) { access_key = key; }
166 };
167 /* process stream request */
168 extern int process_request(rgw::sal::Store* store,
169 RGWREST* rest,
170 RGWRequest* req,
171 const std::string& frontend_prefix,
172 const rgw_auth_registry_t& auth_registry,
173 RGWRestfulIO* client_io,
174 OpsLogSink* olog,
175 optional_yield y,
176 rgw::dmclock::Scheduler *scheduler,
177 std::string* user,
178 ceph::coarse_real_clock::duration* latency,
179 std::shared_ptr<RateLimiter> ratelimit,
180 int* http_ret = nullptr);
181
182 extern int rgw_process_authenticated(RGWHandler_REST* handler,
183 RGWOp*& op,
184 RGWRequest* req,
185 req_state* s,
186 optional_yield y,
187 rgw::sal::Store* store,
188 bool skip_retarget = false);
189
190 #if defined(def_dout_subsys)
191 #undef def_dout_subsys
192 #undef dout_subsys
193 #endif
194 #undef dout_context
195
196 #endif /* RGW_PROCESS_H */