]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/task_group.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/util/task_group.h"
21 #include <condition_variable>
26 #include "arrow/util/checked_cast.h"
27 #include "arrow/util/logging.h"
28 #include "arrow/util/thread_pool.h"
35 ////////////////////////////////////////////////////////////////////////
36 // Serial TaskGroup implementation
38 class SerialTaskGroup
: public TaskGroup
{
40 explicit SerialTaskGroup(StopToken stop_token
) : stop_token_(std::move(stop_token
)) {}
42 void AppendReal(FnOnce
<Status()> task
) override
{
44 if (stop_token_
.IsStopRequested()) {
45 status_
&= stop_token_
.Poll();
49 status_
&= std::move(task
)();
53 Status
current_status() override
{ return status_
; }
55 bool ok() const override
{ return status_
.ok(); }
57 Status
Finish() override
{
64 Future
<> FinishAsync() override
{ return Future
<>::MakeFinished(Finish()); }
66 int parallelism() override
{ return 1; }
68 StopToken stop_token_
;
70 bool finished_
= false;
73 ////////////////////////////////////////////////////////////////////////
74 // Threaded TaskGroup implementation
76 class ThreadedTaskGroup
: public TaskGroup
{
78 ThreadedTaskGroup(Executor
* executor
, StopToken stop_token
)
79 : executor_(executor
),
80 stop_token_(std::move(stop_token
)),
84 ~ThreadedTaskGroup() override
{
85 // Make sure all pending tasks are finished, so that dangling references
86 // to this don't persist.
87 ARROW_UNUSED(Finish());
90 void AppendReal(FnOnce
<Status()> task
) override
{
92 if (stop_token_
.IsStopRequested()) {
93 UpdateStatus(stop_token_
.Poll());
97 // The hot path is unlocked thanks to atomics
98 // Only if an error occurs is the lock taken
99 if (ok_
.load(std::memory_order_acquire
)) {
100 nremaining_
.fetch_add(1, std::memory_order_acquire
);
102 auto self
= checked_pointer_cast
<ThreadedTaskGroup
>(shared_from_this());
106 if (self_
->ok_
.load(std::memory_order_acquire
)) {
108 if (stop_token_
.IsStopRequested()) {
109 st
= stop_token_
.Poll();
111 // XXX what about exceptions?
112 st
= std::move(task_
)();
114 self_
->UpdateStatus(std::move(st
));
116 self_
->OneTaskDone();
119 std::shared_ptr
<ThreadedTaskGroup
> self_
;
120 FnOnce
<Status()> task_
;
121 StopToken stop_token_
;
125 executor_
->Spawn(Callable
{std::move(self
), std::move(task
), stop_token_
});
126 UpdateStatus(std::move(st
));
130 Status
current_status() override
{
131 std::lock_guard
<std::mutex
> lock(mutex_
);
135 bool ok() const override
{ return ok_
.load(); }
137 Status
Finish() override
{
138 std::unique_lock
<std::mutex
> lock(mutex_
);
140 cv_
.wait(lock
, [&]() { return nremaining_
.load() == 0; });
141 // Current tasks may start other tasks, so only set this when done
147 Future
<> FinishAsync() override
{
148 std::lock_guard
<std::mutex
> lock(mutex_
);
149 if (!completion_future_
.has_value()) {
150 if (nremaining_
.load() == 0) {
151 completion_future_
= Future
<>::MakeFinished(status_
);
153 completion_future_
= Future
<>::Make();
156 return *completion_future_
;
159 int parallelism() override
{ return executor_
->GetCapacity(); }
162 void UpdateStatus(Status
&& st
) {
163 // Must be called unlocked, only locks on error
164 if (ARROW_PREDICT_FALSE(!st
.ok())) {
165 std::lock_guard
<std::mutex
> lock(mutex_
);
166 ok_
.store(false, std::memory_order_release
);
167 status_
&= std::move(st
);
172 // Can be called unlocked thanks to atomics
173 auto nremaining
= nremaining_
.fetch_sub(1, std::memory_order_release
) - 1;
174 DCHECK_GE(nremaining
, 0);
175 if (nremaining
== 0) {
176 // Take the lock so that ~ThreadedTaskGroup cannot destroy cv
177 // before cv.notify_one() has returned
178 std::unique_lock
<std::mutex
> lock(mutex_
);
180 if (completion_future_
.has_value()) {
181 // MarkFinished could be slow. We don't want to call it while we are holding
183 auto& future
= *completion_future_
;
184 const auto finished
= completion_future_
->is_finished();
185 const auto& status
= status_
;
186 // This will be redundant if the user calls Finish and not FinishAsync
187 if (!finished
&& !finished_
) {
190 future
.MarkFinished(status
);
198 // These members are usable unlocked
200 StopToken stop_token_
;
201 std::atomic
<int32_t> nremaining_
;
202 std::atomic
<bool> ok_
;
204 // These members use locking
206 std::condition_variable cv_
;
208 bool finished_
= false;
209 util::optional
<Future
<>> completion_future_
;
214 std::shared_ptr
<TaskGroup
> TaskGroup::MakeSerial(StopToken stop_token
) {
215 return std::shared_ptr
<TaskGroup
>(new SerialTaskGroup
{stop_token
});
218 std::shared_ptr
<TaskGroup
> TaskGroup::MakeThreaded(Executor
* thread_pool
,
219 StopToken stop_token
) {
220 return std::shared_ptr
<TaskGroup
>(new ThreadedTaskGroup
{thread_pool
, stop_token
});
223 } // namespace internal