]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae | 3 | |
1e59de90 | 4 | #pragma once |
7c673cae FG |
5 | |
6 | #include "rgw_common.h" | |
7c673cae | 7 | #include "rgw_acl.h" |
7c673cae | 8 | #include "rgw_user.h" |
7c673cae | 9 | #include "rgw_rest.h" |
11fdf7f2 | 10 | #include "include/ceph_assert.h" |
7c673cae FG |
11 | |
12 | #include "common/WorkQueue.h" | |
13 | #include "common/Throttle.h" | |
14 | ||
15 | #include <atomic> | |
16 | ||
7c673cae FG |
17 | #define dout_context g_ceph_context |
18 | ||
7c673cae | 19 | |
11fdf7f2 TL |
20 | namespace rgw::dmclock { |
21 | class Scheduler; | |
22 | } | |
23 | ||
1e59de90 | 24 | struct RGWProcessEnv; |
7c673cae | 25 | class RGWFrontendConfig; |
f67539c2 | 26 | class RGWRequest; |
7c673cae FG |
27 | |
28 | class RGWProcess { | |
20effc67 | 29 | std::deque<RGWRequest*> m_req_queue; |
7c673cae FG |
30 | protected: |
31 | CephContext *cct; | |
1e59de90 | 32 | RGWProcessEnv& env; |
7c673cae FG |
33 | ThreadPool m_tp; |
34 | Throttle req_throttle; | |
7c673cae FG |
35 | RGWFrontendConfig* conf; |
36 | int sock_fd; | |
37 | std::string uri_prefix; | |
38 | ||
b3b6e05e | 39 | struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> { |
7c673cae | 40 | RGWProcess* process; |
f67539c2 TL |
41 | RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout, |
42 | ThreadPool* tp) | |
7c673cae FG |
43 | : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, |
44 | tp), process(p) {} | |
45 | ||
11fdf7f2 | 46 | bool _enqueue(RGWRequest* req) override; |
7c673cae FG |
47 | |
48 | void _dequeue(RGWRequest* req) override { | |
49 | ceph_abort(); | |
50 | } | |
51 | ||
52 | bool _empty() override { | |
53 | return process->m_req_queue.empty(); | |
54 | } | |
55 | ||
11fdf7f2 | 56 | RGWRequest* _dequeue() override; |
7c673cae FG |
57 | |
58 | using ThreadPool::WorkQueue<RGWRequest>::_process; | |
59 | ||
11fdf7f2 | 60 | void _process(RGWRequest *req, ThreadPool::TPHandle &) override; |
7c673cae FG |
61 | |
62 | void _dump_queue(); | |
63 | ||
64 | void _clear() override { | |
11fdf7f2 | 65 | ceph_assert(process->m_req_queue.empty()); |
7c673cae | 66 | } |
b3b6e05e TL |
67 | |
68 | CephContext *get_cct() const override { return process->cct; } | |
69 | unsigned get_subsys() const { return ceph_subsys_rgw; } | |
70 | std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw request work queue: ";} | |
71 | ||
7c673cae FG |
72 | } req_wq; |
73 | ||
74 | public: | |
75 | RGWProcess(CephContext* const cct, | |
1e59de90 | 76 | RGWProcessEnv& env, |
7c673cae | 77 | const int num_threads, |
1e59de90 | 78 | std::string uri_prefix, |
7c673cae | 79 | RGWFrontendConfig* const conf) |
1e59de90 | 80 | : cct(cct), env(env), |
7c673cae FG |
81 | m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads), |
82 | req_throttle(cct, "rgw_ops", num_threads * 2), | |
7c673cae FG |
83 | conf(conf), |
84 | sock_fd(-1), | |
1e59de90 | 85 | uri_prefix(std::move(uri_prefix)), |
f67539c2 TL |
86 | req_wq(this, |
87 | ceph::make_timespan(g_conf()->rgw_op_thread_timeout), | |
88 | ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout), | |
89 | &m_tp) { | |
7c673cae | 90 | } |
1e59de90 | 91 | |
7c673cae FG |
92 | virtual ~RGWProcess() = default; |
93 | ||
1e59de90 TL |
94 | const RGWProcessEnv& get_env() const { return env; } |
95 | ||
7c673cae | 96 | virtual void run() = 0; |
b3b6e05e | 97 | virtual void handle_request(const DoutPrefixProvider *dpp, RGWRequest *req) = 0; |
7c673cae FG |
98 | |
99 | void pause() { | |
100 | m_tp.pause(); | |
101 | } | |
102 | ||
1e59de90 | 103 | void unpause_with_new_config() { |
7c673cae FG |
104 | m_tp.unpause(); |
105 | } | |
106 | ||
107 | void close_fd() { | |
108 | if (sock_fd >= 0) { | |
109 | ::close(sock_fd); | |
110 | sock_fd = -1; | |
111 | } | |
112 | } | |
113 | }; /* RGWProcess */ | |
114 | ||
7c673cae FG |
115 | class RGWProcessControlThread : public Thread { |
116 | RGWProcess *pprocess; | |
117 | public: | |
11fdf7f2 | 118 | explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} |
7c673cae FG |
119 | |
120 | void *entry() override { | |
121 | pprocess->run(); | |
122 | return NULL; | |
123 | } | |
124 | }; | |
125 | ||
126 | class RGWLoadGenProcess : public RGWProcess { | |
127 | RGWAccessKey access_key; | |
128 | public: | |
1e59de90 TL |
129 | RGWLoadGenProcess(CephContext* cct, RGWProcessEnv& env, int num_threads, |
130 | std::string uri_prefix, RGWFrontendConfig* _conf) | |
131 | : RGWProcess(cct, env, num_threads, std::move(uri_prefix), _conf) {} | |
7c673cae FG |
132 | void run() override; |
133 | void checkpoint(); | |
b3b6e05e | 134 | void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override; |
20effc67 | 135 | void gen_request(const std::string& method, const std::string& resource, |
31f18b77 | 136 | int content_length, std::atomic<bool>* fail_flag); |
7c673cae FG |
137 | |
138 | void set_access_key(RGWAccessKey& key) { access_key = key; } | |
139 | }; | |
7c673cae | 140 | /* process stream request */ |
1e59de90 | 141 | extern int process_request(const RGWProcessEnv& penv, |
7c673cae FG |
142 | RGWRequest* req, |
143 | const std::string& frontend_prefix, | |
7c673cae | 144 | RGWRestfulIO* client_io, |
11fdf7f2 TL |
145 | optional_yield y, |
146 | rgw::dmclock::Scheduler *scheduler, | |
f67539c2 TL |
147 | std::string* user, |
148 | ceph::coarse_real_clock::duration* latency, | |
94b18763 | 149 | int* http_ret = nullptr); |
7c673cae FG |
150 | |
151 | extern int rgw_process_authenticated(RGWHandler_REST* handler, | |
152 | RGWOp*& op, | |
153 | RGWRequest* req, | |
154 | req_state* s, | |
20effc67 | 155 | optional_yield y, |
1e59de90 | 156 | rgw::sal::Driver* driver, |
7c673cae FG |
157 | bool skip_retarget = false); |
158 | ||
7c673cae | 159 | #undef dout_context |