2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/concurrency/TimerManager.h>
21 #include <thrift/concurrency/Exception.h>
30 namespace concurrency
{
32 using std::shared_ptr
;
40 class TimerManager::Task
: public Runnable
{
43 enum STATE
{ WAITING
, EXECUTING
, CANCELLED
, COMPLETE
};
45 Task(shared_ptr
<Runnable
> runnable
) : runnable_(runnable
), state_(WAITING
) {}
47 ~Task() override
= default;
50 if (state_
== EXECUTING
) {
56 bool operator==(const shared_ptr
<Runnable
> & runnable
) const { return runnable_
== runnable
; }
61 shared_ptr
<Runnable
> runnable_
;
62 friend class TimerManager::Dispatcher
;
66 class TimerManager::Dispatcher
: public Runnable
{
69 Dispatcher(TimerManager
* manager
) : manager_(manager
) {}
71 ~Dispatcher() override
= default;
74 * Dispatcher entry point
76 * As long as dispatcher thread is running, pull tasks off the task taskMap_
81 Synchronized
s(manager_
->monitor_
);
82 if (manager_
->state_
== TimerManager::STARTING
) {
83 manager_
->state_
= TimerManager::STARTED
;
84 manager_
->monitor_
.notifyAll();
89 std::set
<shared_ptr
<TimerManager::Task
> > expiredTasks
;
91 Synchronized
s(manager_
->monitor_
);
92 task_iterator expiredTaskEnd
;
93 auto now
= std::chrono::steady_clock::now();
94 while (manager_
->state_
== TimerManager::STARTED
95 && (expiredTaskEnd
= manager_
->taskMap_
.upper_bound(now
))
96 == manager_
->taskMap_
.begin()) {
97 std::chrono::milliseconds
timeout(0);
98 if (!manager_
->taskMap_
.empty()) {
99 timeout
= std::chrono::duration_cast
<std::chrono::milliseconds
>(manager_
->taskMap_
.begin()->first
- now
);
100 //because the unit of steady_clock is smaller than millisecond,timeout may be 0.
101 if (timeout
.count() == 0) {
102 timeout
= std::chrono::milliseconds(1);
104 manager_
->monitor_
.waitForTimeRelative(timeout
);
106 manager_
->monitor_
.waitForTimeRelative(0);
108 now
= std::chrono::steady_clock::now();
111 if (manager_
->state_
== TimerManager::STARTED
) {
112 for (auto ix
= manager_
->taskMap_
.begin(); ix
!= expiredTaskEnd
; ix
++) {
113 shared_ptr
<TimerManager::Task
> task
= ix
->second
;
114 expiredTasks
.insert(task
);
115 task
->it_
= manager_
->taskMap_
.end();
116 if (task
->state_
== TimerManager::Task::WAITING
) {
117 task
->state_
= TimerManager::Task::EXECUTING
;
119 manager_
->taskCount_
--;
121 manager_
->taskMap_
.erase(manager_
->taskMap_
.begin(), expiredTaskEnd
);
125 for (const auto & expiredTask
: expiredTasks
) {
129 } while (manager_
->state_
== TimerManager::STARTED
);
132 Synchronized
s(manager_
->monitor_
);
133 if (manager_
->state_
== TimerManager::STOPPING
) {
134 manager_
->state_
= TimerManager::STOPPED
;
135 manager_
->monitor_
.notifyAll();
142 TimerManager
* manager_
;
143 friend class TimerManager
;
146 #if defined(_MSC_VER)
147 #pragma warning(push)
148 #pragma warning(disable : 4355) // 'this' used in base member initializer list
151 TimerManager::TimerManager()
153 state_(TimerManager::UNINITIALIZED
),
154 dispatcher_(std::make_shared
<Dispatcher
>(this)) {
157 #if defined(_MSC_VER)
161 TimerManager::~TimerManager() {
163 // If we haven't been explicitly stopped, do so now. We don't need to grab
164 // the monitor here, since stop already takes care of reentrancy.
166 if (state_
!= STOPPED
) {
170 // We're really hosed.
175 void TimerManager::start() {
176 bool doStart
= false;
178 Synchronized
s(monitor_
);
179 if (!threadFactory_
) {
180 throw InvalidArgumentException();
182 if (state_
== TimerManager::UNINITIALIZED
) {
183 state_
= TimerManager::STARTING
;
189 dispatcherThread_
= threadFactory_
->newThread(dispatcher_
);
190 dispatcherThread_
->start();
194 Synchronized
s(monitor_
);
195 while (state_
== TimerManager::STARTING
) {
198 assert(state_
!= TimerManager::STARTING
);
202 void TimerManager::stop() {
205 Synchronized
s(monitor_
);
206 if (state_
== TimerManager::UNINITIALIZED
) {
207 state_
= TimerManager::STOPPED
;
208 } else if (state_
!= STOPPING
&& state_
!= STOPPED
) {
211 monitor_
.notifyAll();
213 while (state_
!= STOPPED
) {
219 // Clean up any outstanding tasks
222 // Remove dispatcher's reference to us.
223 dispatcher_
->manager_
= nullptr;
227 shared_ptr
<const ThreadFactory
> TimerManager::threadFactory() const {
228 Synchronized
s(monitor_
);
229 return threadFactory_
;
232 void TimerManager::threadFactory(shared_ptr
<const ThreadFactory
> value
) {
233 Synchronized
s(monitor_
);
234 threadFactory_
= value
;
237 size_t TimerManager::taskCount() const {
241 TimerManager::Timer
TimerManager::add(shared_ptr
<Runnable
> task
, const std::chrono::milliseconds
&timeout
) {
242 return add(task
, std::chrono::steady_clock::now() + timeout
);
245 TimerManager::Timer
TimerManager::add(shared_ptr
<Runnable
> task
,
246 const std::chrono::time_point
<std::chrono::steady_clock
>& abstime
) {
247 auto now
= std::chrono::steady_clock::now();
250 throw InvalidArgumentException();
252 Synchronized
s(monitor_
);
253 if (state_
!= TimerManager::STARTED
) {
254 throw IllegalStateException();
257 // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
258 // if the expiration time is shorter than the current value. Need to test before we insert,
259 // because the new task might insert at the front.
260 bool notifyRequired
= (taskCount_
== 0) ? true : abstime
< taskMap_
.begin()->first
;
262 shared_ptr
<Task
> timer(new Task(task
));
264 timer
->it_
= taskMap_
.emplace(abstime
, timer
);
266 // If the task map was empty, or if we have an expiration that is earlier
267 // than any previously seen, kick the dispatcher so it can update its
269 if (notifyRequired
) {
276 void TimerManager::remove(shared_ptr
<Runnable
> task
) {
277 Synchronized
s(monitor_
);
278 if (state_
!= TimerManager::STARTED
) {
279 throw IllegalStateException();
282 for (auto ix
= taskMap_
.begin(); ix
!= taskMap_
.end();) {
283 if (*ix
->second
== task
) {
286 taskMap_
.erase(ix
++);
292 throw NoSuchTaskException();
296 void TimerManager::remove(Timer handle
) {
297 Synchronized
s(monitor_
);
298 if (state_
!= TimerManager::STARTED
) {
299 throw IllegalStateException();
302 shared_ptr
<Task
> task
= handle
.lock();
304 throw NoSuchTaskException();
307 if (task
->it_
== taskMap_
.end()) {
308 // Task is being executed
309 throw UncancellableTaskException();
312 taskMap_
.erase(task
->it_
);
316 TimerManager::STATE
TimerManager::state() const {
321 } // apache::thrift::concurrency