]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <thrift/concurrency/TimerManager.h> | |
21 | #include <thrift/concurrency/Exception.h> | |
22 | ||
23 | #include <assert.h> | |
24 | #include <iostream> | |
25 | #include <memory> | |
26 | #include <set> | |
27 | ||
28 | namespace apache { | |
29 | namespace thrift { | |
30 | namespace concurrency { | |
31 | ||
32 | using std::shared_ptr; | |
33 | using std::weak_ptr; | |
34 | ||
35 | /** | |
36 | * TimerManager class | |
37 | * | |
38 | * @version $Id:$ | |
39 | */ | |
40 | class TimerManager::Task : public Runnable { | |
41 | ||
42 | public: | |
43 | enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE }; | |
44 | ||
45 | Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {} | |
46 | ||
47 | ~Task() override = default; | |
48 | ||
49 | void run() override { | |
50 | if (state_ == EXECUTING) { | |
51 | runnable_->run(); | |
52 | state_ = COMPLETE; | |
53 | } | |
54 | } | |
55 | ||
56 | bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; } | |
57 | ||
58 | task_iterator it_; | |
59 | ||
60 | private: | |
61 | shared_ptr<Runnable> runnable_; | |
62 | friend class TimerManager::Dispatcher; | |
63 | STATE state_; | |
64 | }; | |
65 | ||
66 | class TimerManager::Dispatcher : public Runnable { | |
67 | ||
68 | public: | |
69 | Dispatcher(TimerManager* manager) : manager_(manager) {} | |
70 | ||
71 | ~Dispatcher() override = default; | |
72 | ||
73 | /** | |
74 | * Dispatcher entry point | |
75 | * | |
76 | * As long as dispatcher thread is running, pull tasks off the task taskMap_ | |
77 | * and execute. | |
78 | */ | |
79 | void run() override { | |
80 | { | |
81 | Synchronized s(manager_->monitor_); | |
82 | if (manager_->state_ == TimerManager::STARTING) { | |
83 | manager_->state_ = TimerManager::STARTED; | |
84 | manager_->monitor_.notifyAll(); | |
85 | } | |
86 | } | |
87 | ||
88 | do { | |
89 | std::set<shared_ptr<TimerManager::Task> > expiredTasks; | |
90 | { | |
91 | Synchronized s(manager_->monitor_); | |
92 | task_iterator expiredTaskEnd; | |
93 | auto now = std::chrono::steady_clock::now(); | |
94 | while (manager_->state_ == TimerManager::STARTED | |
95 | && (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) | |
96 | == manager_->taskMap_.begin()) { | |
97 | std::chrono::milliseconds timeout(0); | |
98 | if (!manager_->taskMap_.empty()) { | |
99 | timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now); | |
100 | //because the unit of steady_clock is smaller than millisecond,timeout may be 0. | |
101 | if (timeout.count() == 0) { | |
102 | timeout = std::chrono::milliseconds(1); | |
103 | } | |
104 | manager_->monitor_.waitForTimeRelative(timeout); | |
105 | } else { | |
106 | manager_->monitor_.waitForTimeRelative(0); | |
107 | } | |
108 | now = std::chrono::steady_clock::now(); | |
109 | } | |
110 | ||
111 | if (manager_->state_ == TimerManager::STARTED) { | |
112 | for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { | |
113 | shared_ptr<TimerManager::Task> task = ix->second; | |
114 | expiredTasks.insert(task); | |
115 | task->it_ = manager_->taskMap_.end(); | |
116 | if (task->state_ == TimerManager::Task::WAITING) { | |
117 | task->state_ = TimerManager::Task::EXECUTING; | |
118 | } | |
119 | manager_->taskCount_--; | |
120 | } | |
121 | manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); | |
122 | } | |
123 | } | |
124 | ||
125 | for (const auto & expiredTask : expiredTasks) { | |
126 | expiredTask->run(); | |
127 | } | |
128 | ||
129 | } while (manager_->state_ == TimerManager::STARTED); | |
130 | ||
131 | { | |
132 | Synchronized s(manager_->monitor_); | |
133 | if (manager_->state_ == TimerManager::STOPPING) { | |
134 | manager_->state_ = TimerManager::STOPPED; | |
135 | manager_->monitor_.notifyAll(); | |
136 | } | |
137 | } | |
138 | return; | |
139 | } | |
140 | ||
141 | private: | |
142 | TimerManager* manager_; | |
143 | friend class TimerManager; | |
144 | }; | |
145 | ||
146 | #if defined(_MSC_VER) | |
147 | #pragma warning(push) | |
148 | #pragma warning(disable : 4355) // 'this' used in base member initializer list | |
149 | #endif | |
150 | ||
151 | TimerManager::TimerManager() | |
152 | : taskCount_(0), | |
153 | state_(TimerManager::UNINITIALIZED), | |
154 | dispatcher_(std::make_shared<Dispatcher>(this)) { | |
155 | } | |
156 | ||
157 | #if defined(_MSC_VER) | |
158 | #pragma warning(pop) | |
159 | #endif | |
160 | ||
161 | TimerManager::~TimerManager() { | |
162 | ||
163 | // If we haven't been explicitly stopped, do so now. We don't need to grab | |
164 | // the monitor here, since stop already takes care of reentrancy. | |
165 | ||
166 | if (state_ != STOPPED) { | |
167 | try { | |
168 | stop(); | |
169 | } catch (...) { | |
170 | // We're really hosed. | |
171 | } | |
172 | } | |
173 | } | |
174 | ||
175 | void TimerManager::start() { | |
176 | bool doStart = false; | |
177 | { | |
178 | Synchronized s(monitor_); | |
179 | if (!threadFactory_) { | |
180 | throw InvalidArgumentException(); | |
181 | } | |
182 | if (state_ == TimerManager::UNINITIALIZED) { | |
183 | state_ = TimerManager::STARTING; | |
184 | doStart = true; | |
185 | } | |
186 | } | |
187 | ||
188 | if (doStart) { | |
189 | dispatcherThread_ = threadFactory_->newThread(dispatcher_); | |
190 | dispatcherThread_->start(); | |
191 | } | |
192 | ||
193 | { | |
194 | Synchronized s(monitor_); | |
195 | while (state_ == TimerManager::STARTING) { | |
196 | monitor_.wait(); | |
197 | } | |
198 | assert(state_ != TimerManager::STARTING); | |
199 | } | |
200 | } | |
201 | ||
202 | void TimerManager::stop() { | |
203 | bool doStop = false; | |
204 | { | |
205 | Synchronized s(monitor_); | |
206 | if (state_ == TimerManager::UNINITIALIZED) { | |
207 | state_ = TimerManager::STOPPED; | |
208 | } else if (state_ != STOPPING && state_ != STOPPED) { | |
209 | doStop = true; | |
210 | state_ = STOPPING; | |
211 | monitor_.notifyAll(); | |
212 | } | |
213 | while (state_ != STOPPED) { | |
214 | monitor_.wait(); | |
215 | } | |
216 | } | |
217 | ||
218 | if (doStop) { | |
219 | // Clean up any outstanding tasks | |
220 | taskMap_.clear(); | |
221 | ||
222 | // Remove dispatcher's reference to us. | |
223 | dispatcher_->manager_ = nullptr; | |
224 | } | |
225 | } | |
226 | ||
227 | shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { | |
228 | Synchronized s(monitor_); | |
229 | return threadFactory_; | |
230 | } | |
231 | ||
232 | void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) { | |
233 | Synchronized s(monitor_); | |
234 | threadFactory_ = value; | |
235 | } | |
236 | ||
237 | size_t TimerManager::taskCount() const { | |
238 | return taskCount_; | |
239 | } | |
240 | ||
241 | TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) { | |
242 | return add(task, std::chrono::steady_clock::now() + timeout); | |
243 | } | |
244 | ||
245 | TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, | |
246 | const std::chrono::time_point<std::chrono::steady_clock>& abstime) { | |
247 | auto now = std::chrono::steady_clock::now(); | |
248 | ||
249 | if (abstime < now) { | |
250 | throw InvalidArgumentException(); | |
251 | } | |
252 | Synchronized s(monitor_); | |
253 | if (state_ != TimerManager::STARTED) { | |
254 | throw IllegalStateException(); | |
255 | } | |
256 | ||
257 | // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him | |
258 | // if the expiration time is shorter than the current value. Need to test before we insert, | |
259 | // because the new task might insert at the front. | |
260 | bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first; | |
261 | ||
262 | shared_ptr<Task> timer(new Task(task)); | |
263 | taskCount_++; | |
264 | timer->it_ = taskMap_.emplace(abstime, timer); | |
265 | ||
266 | // If the task map was empty, or if we have an expiration that is earlier | |
267 | // than any previously seen, kick the dispatcher so it can update its | |
268 | // timeout | |
269 | if (notifyRequired) { | |
270 | monitor_.notify(); | |
271 | } | |
272 | ||
273 | return timer; | |
274 | } | |
275 | ||
276 | void TimerManager::remove(shared_ptr<Runnable> task) { | |
277 | Synchronized s(monitor_); | |
278 | if (state_ != TimerManager::STARTED) { | |
279 | throw IllegalStateException(); | |
280 | } | |
281 | bool found = false; | |
282 | for (auto ix = taskMap_.begin(); ix != taskMap_.end();) { | |
283 | if (*ix->second == task) { | |
284 | found = true; | |
285 | taskCount_--; | |
286 | taskMap_.erase(ix++); | |
287 | } else { | |
288 | ++ix; | |
289 | } | |
290 | } | |
291 | if (!found) { | |
292 | throw NoSuchTaskException(); | |
293 | } | |
294 | } | |
295 | ||
296 | void TimerManager::remove(Timer handle) { | |
297 | Synchronized s(monitor_); | |
298 | if (state_ != TimerManager::STARTED) { | |
299 | throw IllegalStateException(); | |
300 | } | |
301 | ||
302 | shared_ptr<Task> task = handle.lock(); | |
303 | if (!task) { | |
304 | throw NoSuchTaskException(); | |
305 | } | |
306 | ||
307 | if (task->it_ == taskMap_.end()) { | |
308 | // Task is being executed | |
309 | throw UncancellableTaskException(); | |
310 | } | |
311 | ||
312 | taskMap_.erase(task->it_); | |
313 | taskCount_--; | |
314 | } | |
315 | ||
316 | TimerManager::STATE TimerManager::state() const { | |
317 | return state_; | |
318 | } | |
319 | } | |
320 | } | |
321 | } // apache::thrift::concurrency |