1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
27 #include <type_traits>
30 #include "arrow/result.h"
31 #include "arrow/status.h"
32 #include "arrow/util/cancel.h"
33 #include "arrow/util/functional.h"
34 #include "arrow/util/future.h"
35 #include "arrow/util/macros.h"
36 #include "arrow/util/visibility.h"
39 // Disable harmless warning for decorated name length limit
40 #pragma warning(disable : 4503)
45 /// \brief Get the capacity of the global thread pool
47 /// Return the number of worker threads in the thread pool to which
48 /// Arrow dispatches various CPU-bound tasks. This is an ideal number,
49 /// not necessarily the exact number of threads at a given point in time.
51 /// You can change this number using SetCpuThreadPoolCapacity().
52 ARROW_EXPORT
int GetCpuThreadPoolCapacity();
54 /// \brief Set the capacity of the global thread pool
56 /// Set the number of worker threads int the thread pool to which
57 /// Arrow dispatches various CPU-bound tasks.
59 /// The current number is returned by GetCpuThreadPoolCapacity().
60 ARROW_EXPORT Status
SetCpuThreadPoolCapacity(int threads
);
64 // Hints about a task that may be used by an Executor.
65 // They are ignored by the provided ThreadPool implementation.
67 // The lower, the more urgent
69 // The IO transfer size in bytes
71 // The approximate CPU cost in number of instructions
72 int64_t cpu_cost
= -1;
73 // An application-specific ID
74 int64_t external_id
= -1;
77 class ARROW_EXPORT Executor
{
79 using StopCallback
= internal::FnOnce
<void(const Status
&)>;
83 // Spawn a fire-and-forget task.
84 template <typename Function
>
85 Status
Spawn(Function
&& func
) {
86 return SpawnReal(TaskHints
{}, std::forward
<Function
>(func
), StopToken::Unstoppable(),
89 template <typename Function
>
90 Status
Spawn(Function
&& func
, StopToken stop_token
) {
91 return SpawnReal(TaskHints
{}, std::forward
<Function
>(func
), std::move(stop_token
),
94 template <typename Function
>
95 Status
Spawn(TaskHints hints
, Function
&& func
) {
96 return SpawnReal(hints
, std::forward
<Function
>(func
), StopToken::Unstoppable(),
99 template <typename Function
>
100 Status
Spawn(TaskHints hints
, Function
&& func
, StopToken stop_token
) {
101 return SpawnReal(hints
, std::forward
<Function
>(func
), std::move(stop_token
),
104 template <typename Function
>
105 Status
Spawn(TaskHints hints
, Function
&& func
, StopToken stop_token
,
106 StopCallback stop_callback
) {
107 return SpawnReal(hints
, std::forward
<Function
>(func
), std::move(stop_token
),
108 std::move(stop_callback
));
111 // Transfers a future to this executor. Any continuations added to the
112 // returned future will run in this executor. Otherwise they would run
113 // on the same thread that called MarkFinished.
115 // This is necessary when (for example) an I/O task is completing a future.
116 // The continuations of that future should run on the CPU thread pool keeping
117 // CPU heavy work off the I/O thread pool. So the I/O task should transfer
118 // the future to the CPU executor before returning.
120 // By default this method will only transfer if the future is not already completed. If
121 // the future is already completed then any callback would be run synchronously and so
122 // no transfer is typically necessary. However, in cases where you want to force a
123 // transfer (e.g. to help the scheduler break up units of work across multiple cores)
124 // then you can override this behavior with `always_transfer`.
125 template <typename T
>
126 Future
<T
> Transfer(Future
<T
> future
) {
127 return DoTransfer(std::move(future
), false);
130 // Overload of Transfer which will always schedule callbacks on new threads even if the
131 // future is finished when the callback is added.
133 // This can be useful in cases where you want to ensure parallelism
134 template <typename T
>
135 Future
<T
> TransferAlways(Future
<T
> future
) {
136 return DoTransfer(std::move(future
), true);
139 // Submit a callable and arguments for execution. Return a future that
140 // will return the callable's result value once.
141 // The callable's arguments are copied before execution.
142 template <typename Function
, typename
... Args
,
143 typename FutureType
= typename ::arrow::detail::ContinueFuture::ForSignature
<
144 Function
&& (Args
&& ...)>>
145 Result
<FutureType
> Submit(TaskHints hints
, StopToken stop_token
, Function
&& func
,
147 using ValueType
= typename
FutureType::ValueType
;
149 auto future
= FutureType::Make();
150 auto task
= std::bind(::arrow::detail::ContinueFuture
{}, future
,
151 std::forward
<Function
>(func
), std::forward
<Args
>(args
)...);
153 WeakFuture
<ValueType
> weak_fut
;
155 void operator()(const Status
& st
) {
156 auto fut
= weak_fut
.get();
157 if (fut
.is_valid()) {
158 fut
.MarkFinished(st
);
161 } stop_callback
{WeakFuture
<ValueType
>(future
)};
162 ARROW_RETURN_NOT_OK(SpawnReal(hints
, std::move(task
), std::move(stop_token
),
163 std::move(stop_callback
)));
168 template <typename Function
, typename
... Args
,
169 typename FutureType
= typename ::arrow::detail::ContinueFuture::ForSignature
<
170 Function
&& (Args
&& ...)>>
171 Result
<FutureType
> Submit(StopToken stop_token
, Function
&& func
, Args
&&... args
) {
172 return Submit(TaskHints
{}, stop_token
, std::forward
<Function
>(func
),
173 std::forward
<Args
>(args
)...);
176 template <typename Function
, typename
... Args
,
177 typename FutureType
= typename ::arrow::detail::ContinueFuture::ForSignature
<
178 Function
&& (Args
&& ...)>>
179 Result
<FutureType
> Submit(TaskHints hints
, Function
&& func
, Args
&&... args
) {
180 return Submit(std::move(hints
), StopToken::Unstoppable(),
181 std::forward
<Function
>(func
), std::forward
<Args
>(args
)...);
184 template <typename Function
, typename
... Args
,
185 typename FutureType
= typename ::arrow::detail::ContinueFuture::ForSignature
<
186 Function
&& (Args
&& ...)>>
187 Result
<FutureType
> Submit(Function
&& func
, Args
&&... args
) {
188 return Submit(TaskHints
{}, StopToken::Unstoppable(), std::forward
<Function
>(func
),
189 std::forward
<Args
>(args
)...);
192 // Return the level of parallelism (the number of tasks that may be executed
193 // concurrently). This may be an approximate number.
194 virtual int GetCapacity() = 0;
196 // Return true if the thread from which this function is called is owned by this
197 // Executor. Returns false if this Executor does not support this property.
198 virtual bool OwnsThisThread() { return false; }
201 ARROW_DISALLOW_COPY_AND_ASSIGN(Executor
);
203 Executor() = default;
205 template <typename T
, typename FT
= Future
<T
>, typename FTSync
= typename
FT::SyncType
>
206 Future
<T
> DoTransfer(Future
<T
> future
, bool always_transfer
= false) {
207 auto transferred
= Future
<T
>::Make();
208 if (always_transfer
) {
209 CallbackOptions callback_options
= CallbackOptions::Defaults();
210 callback_options
.should_schedule
= ShouldSchedule::Always
;
211 callback_options
.executor
= this;
212 auto sync_callback
= [transferred
](const FTSync
& result
) mutable {
213 transferred
.MarkFinished(result
);
215 future
.AddCallback(sync_callback
, callback_options
);
219 // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of
220 // work by doing the test here.
221 auto callback
= [this, transferred
](const FTSync
& result
) mutable {
223 Spawn([transferred
, result
]() mutable { transferred
.MarkFinished(result
); });
224 if (!spawn_status
.ok()) {
225 transferred
.MarkFinished(spawn_status
);
228 auto callback_factory
= [&callback
]() { return callback
; };
229 if (future
.TryAddCallback(callback_factory
)) {
232 // If the future is already finished and we aren't going to force spawn a thread
233 // then we don't need to add another layer of callback and can return the original
239 virtual Status
SpawnReal(TaskHints hints
, FnOnce
<void()> task
, StopToken
,
243 /// \brief An executor implementation that runs all tasks on a single thread using an
246 /// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are
247 /// fine but if one task needs to wait for another task it must be expressed as an
248 /// asynchronous continuation.
249 class ARROW_EXPORT SerialExecutor
: public Executor
{
251 template <typename T
= ::arrow::internal::Empty
>
252 using TopLevelTask
= internal::FnOnce
<Future
<T
>(Executor
*)>;
254 ~SerialExecutor() override
;
256 int GetCapacity() override
{ return 1; };
257 Status
SpawnReal(TaskHints hints
, FnOnce
<void()> task
, StopToken
,
258 StopCallback
&&) override
;
260 /// \brief Runs the TopLevelTask and any scheduled tasks
262 /// The TopLevelTask (or one of the tasks it schedules) must either return an invalid
263 /// status or call the finish signal. Failure to do this will result in a deadlock. For
264 /// this reason it is preferable (if possible) to use the helper methods (below)
265 /// RunSynchronously/RunSerially which delegates the responsiblity onto a Future
266 /// producer's existing responsibility to always mark a future finished (which can
267 /// someday be aided by ARROW-12207).
268 template <typename T
= internal::Empty
, typename FT
= Future
<T
>,
269 typename FTSync
= typename
FT::SyncType
>
270 static FTSync
RunInSerialExecutor(TopLevelTask
<T
> initial_task
) {
271 Future
<T
> fut
= SerialExecutor().Run
<T
>(std::move(initial_task
));
272 return FutureToSync(fut
);
280 std::shared_ptr
<State
> state_
;
282 template <typename T
, typename FTSync
= typename Future
<T
>::SyncType
>
283 Future
<T
> Run(TopLevelTask
<T
> initial_task
) {
284 auto final_fut
= std::move(initial_task
)(this);
285 if (final_fut
.is_finished()) {
288 final_fut
.AddCallback([this](const FTSync
&) { MarkFinished(); });
296 /// An Executor implementation spawning tasks in FIFO manner on a fixed-size
297 /// pool of worker threads.
299 /// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are
300 /// fine but if one task needs to wait for another task it must be expressed as an
301 /// asynchronous continuation.
302 class ARROW_EXPORT ThreadPool
: public Executor
{
304 // Construct a thread pool with the given number of worker threads
305 static Result
<std::shared_ptr
<ThreadPool
>> Make(int threads
);
307 // Like Make(), but takes care that the returned ThreadPool is compatible
308 // with destruction late at process exit.
309 static Result
<std::shared_ptr
<ThreadPool
>> MakeEternal(int threads
);
311 // Destroy thread pool; the pool will first be shut down
312 ~ThreadPool() override
;
314 // Return the desired number of worker threads.
315 // The actual number of workers may lag a bit before being adjusted to
317 int GetCapacity() override
;
319 bool OwnsThisThread() override
;
321 // Return the number of tasks either running or in the queue.
324 // Dynamically change the number of worker threads.
326 // This function always returns immediately.
327 // If fewer threads are running than this number, new threads are spawned
328 // on-demand when needed for task execution.
329 // If more threads are running than this number, excess threads are reaped
330 // as soon as possible.
331 Status
SetCapacity(int threads
);
333 // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
334 // This is exposed as a static method to help with testing.
335 static int DefaultCapacity();
337 // Shutdown the pool. Once the pool starts shutting down, new tasks
338 // cannot be submitted anymore.
339 // If "wait" is true, shutdown waits for all pending tasks to be finished.
340 // If "wait" is false, workers are stopped as soon as currently executing
341 // tasks are finished.
342 Status
Shutdown(bool wait
= true);
344 // Wait for the thread pool to become idle
346 // This is useful for sequencing tests
352 FRIEND_TEST(TestThreadPool
, SetCapacity
);
353 FRIEND_TEST(TestGlobalThreadPool
, Capacity
);
354 friend ARROW_EXPORT ThreadPool
* GetCpuThreadPool();
358 Status
SpawnReal(TaskHints hints
, FnOnce
<void()> task
, StopToken
,
359 StopCallback
&&) override
;
361 // Collect finished worker threads, making sure the OS threads have exited
362 void CollectFinishedWorkersUnlocked();
363 // Launch a given number of additional workers
364 void LaunchWorkersUnlocked(int threads
);
365 // Get the current actual capacity
366 int GetActualCapacity();
367 // Reinitialize the thread pool if the pid changed
368 void ProtectAgainstFork();
370 static std::shared_ptr
<ThreadPool
> MakeCpuThreadPool();
372 std::shared_ptr
<State
> sp_state_
;
374 bool shutdown_on_destroy_
;
380 // Return the process-global thread pool for CPU-bound tasks.
381 ARROW_EXPORT ThreadPool
* GetCpuThreadPool();
383 /// \brief Potentially run an async operation serially (if use_threads is false)
386 /// If `use_threads` is true, the global CPU executor is used.
387 /// If `use_threads` is false, a temporary SerialExecutor is used.
388 /// `get_future` is called (from this thread) with the chosen executor and must
389 /// return a future that will eventually finish. This function returns once the
390 /// future has finished.
391 template <typename Fut
, typename ValueType
= typename
Fut::ValueType
>
392 typename
Fut::SyncType
RunSynchronously(FnOnce
<Fut(Executor
*)> get_future
,
395 auto fut
= std::move(get_future
)(GetCpuThreadPool());
396 return FutureToSync(fut
);
398 return SerialExecutor::RunInSerialExecutor
<ValueType
>(std::move(get_future
));
402 } // namespace internal