]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / concurrency / TimerManager.cpp
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/concurrency/TimerManager.h>
21 #include <thrift/concurrency/Exception.h>
22
23 #include <assert.h>
24 #include <iostream>
25 #include <memory>
26 #include <set>
27
28 namespace apache {
29 namespace thrift {
30 namespace concurrency {
31
32 using std::shared_ptr;
33 using std::weak_ptr;
34
35 /**
36 * TimerManager class
37 *
38 * @version $Id:$
39 */
40 class TimerManager::Task : public Runnable {
41
42 public:
43 enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
44
45 Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
46
47 ~Task() override = default;
48
49 void run() override {
50 if (state_ == EXECUTING) {
51 runnable_->run();
52 state_ = COMPLETE;
53 }
54 }
55
56 bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; }
57
58 task_iterator it_;
59
60 private:
61 shared_ptr<Runnable> runnable_;
62 friend class TimerManager::Dispatcher;
63 STATE state_;
64 };
65
66 class TimerManager::Dispatcher : public Runnable {
67
68 public:
69 Dispatcher(TimerManager* manager) : manager_(manager) {}
70
71 ~Dispatcher() override = default;
72
73 /**
74 * Dispatcher entry point
75 *
76 * As long as dispatcher thread is running, pull tasks off the task taskMap_
77 * and execute.
78 */
79 void run() override {
80 {
81 Synchronized s(manager_->monitor_);
82 if (manager_->state_ == TimerManager::STARTING) {
83 manager_->state_ = TimerManager::STARTED;
84 manager_->monitor_.notifyAll();
85 }
86 }
87
88 do {
89 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
90 {
91 Synchronized s(manager_->monitor_);
92 task_iterator expiredTaskEnd;
93 auto now = std::chrono::steady_clock::now();
94 while (manager_->state_ == TimerManager::STARTED
95 && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
96 == manager_->taskMap_.begin()) {
97 std::chrono::milliseconds timeout(0);
98 if (!manager_->taskMap_.empty()) {
99 timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now);
100 //because the unit of steady_clock is smaller than millisecond,timeout may be 0.
101 if (timeout.count() == 0) {
102 timeout = std::chrono::milliseconds(1);
103 }
104 manager_->monitor_.waitForTimeRelative(timeout);
105 } else {
106 manager_->monitor_.waitForTimeRelative(0);
107 }
108 now = std::chrono::steady_clock::now();
109 }
110
111 if (manager_->state_ == TimerManager::STARTED) {
112 for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
113 shared_ptr<TimerManager::Task> task = ix->second;
114 expiredTasks.insert(task);
115 task->it_ = manager_->taskMap_.end();
116 if (task->state_ == TimerManager::Task::WAITING) {
117 task->state_ = TimerManager::Task::EXECUTING;
118 }
119 manager_->taskCount_--;
120 }
121 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
122 }
123 }
124
125 for (const auto & expiredTask : expiredTasks) {
126 expiredTask->run();
127 }
128
129 } while (manager_->state_ == TimerManager::STARTED);
130
131 {
132 Synchronized s(manager_->monitor_);
133 if (manager_->state_ == TimerManager::STOPPING) {
134 manager_->state_ = TimerManager::STOPPED;
135 manager_->monitor_.notifyAll();
136 }
137 }
138 return;
139 }
140
141 private:
142 TimerManager* manager_;
143 friend class TimerManager;
144 };
145
146 #if defined(_MSC_VER)
147 #pragma warning(push)
148 #pragma warning(disable : 4355) // 'this' used in base member initializer list
149 #endif
150
151 TimerManager::TimerManager()
152 : taskCount_(0),
153 state_(TimerManager::UNINITIALIZED),
154 dispatcher_(std::make_shared<Dispatcher>(this)) {
155 }
156
157 #if defined(_MSC_VER)
158 #pragma warning(pop)
159 #endif
160
161 TimerManager::~TimerManager() {
162
163 // If we haven't been explicitly stopped, do so now. We don't need to grab
164 // the monitor here, since stop already takes care of reentrancy.
165
166 if (state_ != STOPPED) {
167 try {
168 stop();
169 } catch (...) {
170 // We're really hosed.
171 }
172 }
173 }
174
175 void TimerManager::start() {
176 bool doStart = false;
177 {
178 Synchronized s(monitor_);
179 if (!threadFactory_) {
180 throw InvalidArgumentException();
181 }
182 if (state_ == TimerManager::UNINITIALIZED) {
183 state_ = TimerManager::STARTING;
184 doStart = true;
185 }
186 }
187
188 if (doStart) {
189 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
190 dispatcherThread_->start();
191 }
192
193 {
194 Synchronized s(monitor_);
195 while (state_ == TimerManager::STARTING) {
196 monitor_.wait();
197 }
198 assert(state_ != TimerManager::STARTING);
199 }
200 }
201
202 void TimerManager::stop() {
203 bool doStop = false;
204 {
205 Synchronized s(monitor_);
206 if (state_ == TimerManager::UNINITIALIZED) {
207 state_ = TimerManager::STOPPED;
208 } else if (state_ != STOPPING && state_ != STOPPED) {
209 doStop = true;
210 state_ = STOPPING;
211 monitor_.notifyAll();
212 }
213 while (state_ != STOPPED) {
214 monitor_.wait();
215 }
216 }
217
218 if (doStop) {
219 // Clean up any outstanding tasks
220 taskMap_.clear();
221
222 // Remove dispatcher's reference to us.
223 dispatcher_->manager_ = nullptr;
224 }
225 }
226
227 shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
228 Synchronized s(monitor_);
229 return threadFactory_;
230 }
231
232 void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
233 Synchronized s(monitor_);
234 threadFactory_ = value;
235 }
236
237 size_t TimerManager::taskCount() const {
238 return taskCount_;
239 }
240
241 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) {
242 return add(task, std::chrono::steady_clock::now() + timeout);
243 }
244
245 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
246 const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
247 auto now = std::chrono::steady_clock::now();
248
249 if (abstime < now) {
250 throw InvalidArgumentException();
251 }
252 Synchronized s(monitor_);
253 if (state_ != TimerManager::STARTED) {
254 throw IllegalStateException();
255 }
256
257 // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
258 // if the expiration time is shorter than the current value. Need to test before we insert,
259 // because the new task might insert at the front.
260 bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first;
261
262 shared_ptr<Task> timer(new Task(task));
263 taskCount_++;
264 timer->it_ = taskMap_.emplace(abstime, timer);
265
266 // If the task map was empty, or if we have an expiration that is earlier
267 // than any previously seen, kick the dispatcher so it can update its
268 // timeout
269 if (notifyRequired) {
270 monitor_.notify();
271 }
272
273 return timer;
274 }
275
276 void TimerManager::remove(shared_ptr<Runnable> task) {
277 Synchronized s(monitor_);
278 if (state_ != TimerManager::STARTED) {
279 throw IllegalStateException();
280 }
281 bool found = false;
282 for (auto ix = taskMap_.begin(); ix != taskMap_.end();) {
283 if (*ix->second == task) {
284 found = true;
285 taskCount_--;
286 taskMap_.erase(ix++);
287 } else {
288 ++ix;
289 }
290 }
291 if (!found) {
292 throw NoSuchTaskException();
293 }
294 }
295
296 void TimerManager::remove(Timer handle) {
297 Synchronized s(monitor_);
298 if (state_ != TimerManager::STARTED) {
299 throw IllegalStateException();
300 }
301
302 shared_ptr<Task> task = handle.lock();
303 if (!task) {
304 throw NoSuchTaskException();
305 }
306
307 if (task->it_ == taskMap_.end()) {
308 // Task is being executed
309 throw UncancellableTaskException();
310 }
311
312 taskMap_.erase(task->it_);
313 taskCount_--;
314 }
315
316 TimerManager::STATE TimerManager::state() const {
317 return state_;
318 }
319 }
320 }
321 } // apache::thrift::concurrency