]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/thread_pool.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / thread_pool.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #ifndef _WIN32
21 #include <unistd.h>
22 #endif
23
24 #include <cstdint>
25 #include <memory>
26 #include <queue>
27 #include <type_traits>
28 #include <utility>
29
30 #include "arrow/result.h"
31 #include "arrow/status.h"
32 #include "arrow/util/cancel.h"
33 #include "arrow/util/functional.h"
34 #include "arrow/util/future.h"
35 #include "arrow/util/macros.h"
36 #include "arrow/util/visibility.h"
37
38 #if defined(_MSC_VER)
39 // Disable harmless warning for decorated name length limit
40 #pragma warning(disable : 4503)
41 #endif
42
43 namespace arrow {
44
45 /// \brief Get the capacity of the global thread pool
46 ///
47 /// Return the number of worker threads in the thread pool to which
48 /// Arrow dispatches various CPU-bound tasks. This is an ideal number,
49 /// not necessarily the exact number of threads at a given point in time.
50 ///
51 /// You can change this number using SetCpuThreadPoolCapacity().
52 ARROW_EXPORT int GetCpuThreadPoolCapacity();
53
54 /// \brief Set the capacity of the global thread pool
55 ///
56 /// Set the number of worker threads int the thread pool to which
57 /// Arrow dispatches various CPU-bound tasks.
58 ///
59 /// The current number is returned by GetCpuThreadPoolCapacity().
60 ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
61
62 namespace internal {
63
64 // Hints about a task that may be used by an Executor.
65 // They are ignored by the provided ThreadPool implementation.
66 struct TaskHints {
67 // The lower, the more urgent
68 int32_t priority = 0;
69 // The IO transfer size in bytes
70 int64_t io_size = -1;
71 // The approximate CPU cost in number of instructions
72 int64_t cpu_cost = -1;
73 // An application-specific ID
74 int64_t external_id = -1;
75 };
76
77 class ARROW_EXPORT Executor {
78 public:
79 using StopCallback = internal::FnOnce<void(const Status&)>;
80
81 virtual ~Executor();
82
83 // Spawn a fire-and-forget task.
84 template <typename Function>
85 Status Spawn(Function&& func) {
86 return SpawnReal(TaskHints{}, std::forward<Function>(func), StopToken::Unstoppable(),
87 StopCallback{});
88 }
89 template <typename Function>
90 Status Spawn(Function&& func, StopToken stop_token) {
91 return SpawnReal(TaskHints{}, std::forward<Function>(func), std::move(stop_token),
92 StopCallback{});
93 }
94 template <typename Function>
95 Status Spawn(TaskHints hints, Function&& func) {
96 return SpawnReal(hints, std::forward<Function>(func), StopToken::Unstoppable(),
97 StopCallback{});
98 }
99 template <typename Function>
100 Status Spawn(TaskHints hints, Function&& func, StopToken stop_token) {
101 return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
102 StopCallback{});
103 }
104 template <typename Function>
105 Status Spawn(TaskHints hints, Function&& func, StopToken stop_token,
106 StopCallback stop_callback) {
107 return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
108 std::move(stop_callback));
109 }
110
111 // Transfers a future to this executor. Any continuations added to the
112 // returned future will run in this executor. Otherwise they would run
113 // on the same thread that called MarkFinished.
114 //
115 // This is necessary when (for example) an I/O task is completing a future.
116 // The continuations of that future should run on the CPU thread pool keeping
117 // CPU heavy work off the I/O thread pool. So the I/O task should transfer
118 // the future to the CPU executor before returning.
119 //
120 // By default this method will only transfer if the future is not already completed. If
121 // the future is already completed then any callback would be run synchronously and so
122 // no transfer is typically necessary. However, in cases where you want to force a
123 // transfer (e.g. to help the scheduler break up units of work across multiple cores)
124 // then you can override this behavior with `always_transfer`.
125 template <typename T>
126 Future<T> Transfer(Future<T> future) {
127 return DoTransfer(std::move(future), false);
128 }
129
130 // Overload of Transfer which will always schedule callbacks on new threads even if the
131 // future is finished when the callback is added.
132 //
133 // This can be useful in cases where you want to ensure parallelism
134 template <typename T>
135 Future<T> TransferAlways(Future<T> future) {
136 return DoTransfer(std::move(future), true);
137 }
138
139 // Submit a callable and arguments for execution. Return a future that
140 // will return the callable's result value once.
141 // The callable's arguments are copied before execution.
142 template <typename Function, typename... Args,
143 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
144 Function && (Args && ...)>>
145 Result<FutureType> Submit(TaskHints hints, StopToken stop_token, Function&& func,
146 Args&&... args) {
147 using ValueType = typename FutureType::ValueType;
148
149 auto future = FutureType::Make();
150 auto task = std::bind(::arrow::detail::ContinueFuture{}, future,
151 std::forward<Function>(func), std::forward<Args>(args)...);
152 struct {
153 WeakFuture<ValueType> weak_fut;
154
155 void operator()(const Status& st) {
156 auto fut = weak_fut.get();
157 if (fut.is_valid()) {
158 fut.MarkFinished(st);
159 }
160 }
161 } stop_callback{WeakFuture<ValueType>(future)};
162 ARROW_RETURN_NOT_OK(SpawnReal(hints, std::move(task), std::move(stop_token),
163 std::move(stop_callback)));
164
165 return future;
166 }
167
168 template <typename Function, typename... Args,
169 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
170 Function && (Args && ...)>>
171 Result<FutureType> Submit(StopToken stop_token, Function&& func, Args&&... args) {
172 return Submit(TaskHints{}, stop_token, std::forward<Function>(func),
173 std::forward<Args>(args)...);
174 }
175
176 template <typename Function, typename... Args,
177 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
178 Function && (Args && ...)>>
179 Result<FutureType> Submit(TaskHints hints, Function&& func, Args&&... args) {
180 return Submit(std::move(hints), StopToken::Unstoppable(),
181 std::forward<Function>(func), std::forward<Args>(args)...);
182 }
183
184 template <typename Function, typename... Args,
185 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
186 Function && (Args && ...)>>
187 Result<FutureType> Submit(Function&& func, Args&&... args) {
188 return Submit(TaskHints{}, StopToken::Unstoppable(), std::forward<Function>(func),
189 std::forward<Args>(args)...);
190 }
191
192 // Return the level of parallelism (the number of tasks that may be executed
193 // concurrently). This may be an approximate number.
194 virtual int GetCapacity() = 0;
195
196 // Return true if the thread from which this function is called is owned by this
197 // Executor. Returns false if this Executor does not support this property.
198 virtual bool OwnsThisThread() { return false; }
199
200 protected:
201 ARROW_DISALLOW_COPY_AND_ASSIGN(Executor);
202
203 Executor() = default;
204
205 template <typename T, typename FT = Future<T>, typename FTSync = typename FT::SyncType>
206 Future<T> DoTransfer(Future<T> future, bool always_transfer = false) {
207 auto transferred = Future<T>::Make();
208 if (always_transfer) {
209 CallbackOptions callback_options = CallbackOptions::Defaults();
210 callback_options.should_schedule = ShouldSchedule::Always;
211 callback_options.executor = this;
212 auto sync_callback = [transferred](const FTSync& result) mutable {
213 transferred.MarkFinished(result);
214 };
215 future.AddCallback(sync_callback, callback_options);
216 return transferred;
217 }
218
219 // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of
220 // work by doing the test here.
221 auto callback = [this, transferred](const FTSync& result) mutable {
222 auto spawn_status =
223 Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
224 if (!spawn_status.ok()) {
225 transferred.MarkFinished(spawn_status);
226 }
227 };
228 auto callback_factory = [&callback]() { return callback; };
229 if (future.TryAddCallback(callback_factory)) {
230 return transferred;
231 }
232 // If the future is already finished and we aren't going to force spawn a thread
233 // then we don't need to add another layer of callback and can return the original
234 // future
235 return future;
236 }
237
238 // Subclassing API
239 virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
240 StopCallback&&) = 0;
241 };
242
243 /// \brief An executor implementation that runs all tasks on a single thread using an
244 /// event loop.
245 ///
246 /// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are
247 /// fine but if one task needs to wait for another task it must be expressed as an
248 /// asynchronous continuation.
249 class ARROW_EXPORT SerialExecutor : public Executor {
250 public:
251 template <typename T = ::arrow::internal::Empty>
252 using TopLevelTask = internal::FnOnce<Future<T>(Executor*)>;
253
254 ~SerialExecutor() override;
255
256 int GetCapacity() override { return 1; };
257 Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
258 StopCallback&&) override;
259
260 /// \brief Runs the TopLevelTask and any scheduled tasks
261 ///
262 /// The TopLevelTask (or one of the tasks it schedules) must either return an invalid
263 /// status or call the finish signal. Failure to do this will result in a deadlock. For
264 /// this reason it is preferable (if possible) to use the helper methods (below)
265 /// RunSynchronously/RunSerially which delegates the responsiblity onto a Future
266 /// producer's existing responsibility to always mark a future finished (which can
267 /// someday be aided by ARROW-12207).
268 template <typename T = internal::Empty, typename FT = Future<T>,
269 typename FTSync = typename FT::SyncType>
270 static FTSync RunInSerialExecutor(TopLevelTask<T> initial_task) {
271 Future<T> fut = SerialExecutor().Run<T>(std::move(initial_task));
272 return FutureToSync(fut);
273 }
274
275 private:
276 SerialExecutor();
277
278 // State uses mutex
279 struct State;
280 std::shared_ptr<State> state_;
281
282 template <typename T, typename FTSync = typename Future<T>::SyncType>
283 Future<T> Run(TopLevelTask<T> initial_task) {
284 auto final_fut = std::move(initial_task)(this);
285 if (final_fut.is_finished()) {
286 return final_fut;
287 }
288 final_fut.AddCallback([this](const FTSync&) { MarkFinished(); });
289 RunLoop();
290 return final_fut;
291 }
292 void RunLoop();
293 void MarkFinished();
294 };
295
296 /// An Executor implementation spawning tasks in FIFO manner on a fixed-size
297 /// pool of worker threads.
298 ///
299 /// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are
300 /// fine but if one task needs to wait for another task it must be expressed as an
301 /// asynchronous continuation.
302 class ARROW_EXPORT ThreadPool : public Executor {
303 public:
304 // Construct a thread pool with the given number of worker threads
305 static Result<std::shared_ptr<ThreadPool>> Make(int threads);
306
307 // Like Make(), but takes care that the returned ThreadPool is compatible
308 // with destruction late at process exit.
309 static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
310
311 // Destroy thread pool; the pool will first be shut down
312 ~ThreadPool() override;
313
314 // Return the desired number of worker threads.
315 // The actual number of workers may lag a bit before being adjusted to
316 // match this value.
317 int GetCapacity() override;
318
319 bool OwnsThisThread() override;
320
321 // Return the number of tasks either running or in the queue.
322 int GetNumTasks();
323
324 // Dynamically change the number of worker threads.
325 //
326 // This function always returns immediately.
327 // If fewer threads are running than this number, new threads are spawned
328 // on-demand when needed for task execution.
329 // If more threads are running than this number, excess threads are reaped
330 // as soon as possible.
331 Status SetCapacity(int threads);
332
333 // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
334 // This is exposed as a static method to help with testing.
335 static int DefaultCapacity();
336
337 // Shutdown the pool. Once the pool starts shutting down, new tasks
338 // cannot be submitted anymore.
339 // If "wait" is true, shutdown waits for all pending tasks to be finished.
340 // If "wait" is false, workers are stopped as soon as currently executing
341 // tasks are finished.
342 Status Shutdown(bool wait = true);
343
344 // Wait for the thread pool to become idle
345 //
346 // This is useful for sequencing tests
347 void WaitForIdle();
348
349 struct State;
350
351 protected:
352 FRIEND_TEST(TestThreadPool, SetCapacity);
353 FRIEND_TEST(TestGlobalThreadPool, Capacity);
354 friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
355
356 ThreadPool();
357
358 Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
359 StopCallback&&) override;
360
361 // Collect finished worker threads, making sure the OS threads have exited
362 void CollectFinishedWorkersUnlocked();
363 // Launch a given number of additional workers
364 void LaunchWorkersUnlocked(int threads);
365 // Get the current actual capacity
366 int GetActualCapacity();
367 // Reinitialize the thread pool if the pid changed
368 void ProtectAgainstFork();
369
370 static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
371
372 std::shared_ptr<State> sp_state_;
373 State* state_;
374 bool shutdown_on_destroy_;
375 #ifndef _WIN32
376 pid_t pid_;
377 #endif
378 };
379
380 // Return the process-global thread pool for CPU-bound tasks.
381 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
382
383 /// \brief Potentially run an async operation serially (if use_threads is false)
384 /// \see RunSerially
385 ///
386 /// If `use_threads` is true, the global CPU executor is used.
387 /// If `use_threads` is false, a temporary SerialExecutor is used.
388 /// `get_future` is called (from this thread) with the chosen executor and must
389 /// return a future that will eventually finish. This function returns once the
390 /// future has finished.
391 template <typename Fut, typename ValueType = typename Fut::ValueType>
392 typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
393 bool use_threads) {
394 if (use_threads) {
395 auto fut = std::move(get_future)(GetCpuThreadPool());
396 return FutureToSync(fut);
397 } else {
398 return SerialExecutor::RunInSerialExecutor<ValueType>(std::move(get_future));
399 }
400 }
401
402 } // namespace internal
403 } // namespace arrow