1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/util/async_util.h"
20 #include <gtest/gtest.h>
22 #include "arrow/result.h"
23 #include "arrow/testing/future_util.h"
24 #include "arrow/testing/gtest_util.h"
29 class GatingDestroyable
: public AsyncDestroyable
{
31 GatingDestroyable(Future
<> close_future
, bool* destroyed
)
32 : close_future_(std::move(close_future
)), destroyed_(destroyed
) {}
33 ~GatingDestroyable() override
{ *destroyed_
= true; }
36 Future
<> DoDestroy() override
{ return close_future_
; }
39 Future
<> close_future_
;
43 template <typename Factory
>
44 void TestAsyncDestroyable(Factory factory
) {
45 Future
<> gate
= Future
<>::Make();
46 bool destroyed
= false;
47 bool on_closed
= false;
49 auto obj
= factory(gate
, &destroyed
);
50 obj
->on_closed().AddCallback([&](const Status
& st
) { on_closed
= true; });
51 ASSERT_FALSE(destroyed
);
53 ASSERT_FALSE(destroyed
);
54 ASSERT_FALSE(on_closed
);
56 ASSERT_TRUE(destroyed
);
57 ASSERT_TRUE(on_closed
);
60 TEST(AsyncDestroyable
, MakeShared
) {
61 TestAsyncDestroyable([](Future
<> gate
, bool* destroyed
) {
62 return MakeSharedAsync
<GatingDestroyable
>(gate
, destroyed
);
66 // The next four tests are corner cases but can sometimes occur when using these types
67 // in standard containers on certain versions of the compiler/cpplib. Basically we
68 // want to make sure our deleter is ok with null pointers.
69 TEST(AsyncDestroyable
, DefaultUnique
) {
70 std::unique_ptr
<GatingDestroyable
, DestroyingDeleter
<GatingDestroyable
>> default_ptr
;
74 TEST(AsyncDestroyable
, NullUnique
) {
75 std::unique_ptr
<GatingDestroyable
, DestroyingDeleter
<GatingDestroyable
>> null_ptr(
80 TEST(AsyncDestroyable
, NullShared
) {
81 std::shared_ptr
<GatingDestroyable
> null_ptr(nullptr,
82 DestroyingDeleter
<GatingDestroyable
>());
86 TEST(AsyncDestroyable
, NullUniqueToShared
) {
87 std::unique_ptr
<GatingDestroyable
, DestroyingDeleter
<GatingDestroyable
>> null_ptr(
89 std::shared_ptr
<GatingDestroyable
> null_shared
= std::move(null_ptr
);
93 TEST(AsyncDestroyable
, MakeUnique
) {
94 TestAsyncDestroyable([](Future
<> gate
, bool* destroyed
) {
95 return MakeUniqueAsync
<GatingDestroyable
>(gate
, destroyed
);
100 class TypedTestAsyncTaskGroup
: public ::testing::Test
{};
102 using AsyncTaskGroupTypes
= ::testing::Types
<AsyncTaskGroup
, SerializedAsyncTaskGroup
>;
104 TYPED_TEST_SUITE(TypedTestAsyncTaskGroup
, AsyncTaskGroupTypes
);
106 TYPED_TEST(TypedTestAsyncTaskGroup
, Basic
) {
107 TypeParam task_group
;
108 Future
<> fut1
= Future
<>::Make();
109 Future
<> fut2
= Future
<>::Make();
110 ASSERT_OK(task_group
.AddTask([fut1
]() { return fut1
; }));
111 ASSERT_OK(task_group
.AddTask([fut2
]() { return fut2
; }));
112 Future
<> all_done
= task_group
.End();
113 AssertNotFinished(all_done
);
115 AssertNotFinished(all_done
);
117 ASSERT_FINISHES_OK(all_done
);
120 TYPED_TEST(TypedTestAsyncTaskGroup
, NoTasks
) {
121 TypeParam task_group
;
122 ASSERT_FINISHES_OK(task_group
.End());
125 TYPED_TEST(TypedTestAsyncTaskGroup
, OnFinishedDoesNotEnd
) {
126 TypeParam task_group
;
127 Future
<> on_finished
= task_group
.OnFinished();
128 AssertNotFinished(on_finished
);
129 ASSERT_FINISHES_OK(task_group
.End());
130 ASSERT_FINISHES_OK(on_finished
);
133 TYPED_TEST(TypedTestAsyncTaskGroup
, AddAfterDone
) {
134 TypeParam task_group
;
135 ASSERT_FINISHES_OK(task_group
.End());
136 ASSERT_RAISES(Invalid
, task_group
.AddTask([] { return Future
<>::Make(); }));
139 TYPED_TEST(TypedTestAsyncTaskGroup
, AddAfterWaitButBeforeFinish
) {
140 TypeParam task_group
;
141 Future
<> task_one
= Future
<>::Make();
142 ASSERT_OK(task_group
.AddTask([task_one
] { return task_one
; }));
143 Future
<> finish_fut
= task_group
.End();
144 AssertNotFinished(finish_fut
);
145 Future
<> task_two
= Future
<>::Make();
146 ASSERT_OK(task_group
.AddTask([task_two
] { return task_two
; }));
147 AssertNotFinished(finish_fut
);
148 task_one
.MarkFinished();
149 AssertNotFinished(finish_fut
);
150 task_two
.MarkFinished();
151 AssertFinished(finish_fut
);
152 ASSERT_FINISHES_OK(finish_fut
);
155 TYPED_TEST(TypedTestAsyncTaskGroup
, Error
) {
156 TypeParam task_group
;
157 Future
<> failed_task
= Future
<>::MakeFinished(Status::Invalid("XYZ"));
158 ASSERT_RAISES(Invalid
, task_group
.AddTask([failed_task
] { return failed_task
; }));
159 ASSERT_FINISHES_AND_RAISES(Invalid
, task_group
.End());
162 TYPED_TEST(TypedTestAsyncTaskGroup
, TaskFactoryFails
) {
163 TypeParam task_group
;
164 ASSERT_RAISES(Invalid
, task_group
.AddTask([] { return Status::Invalid("XYZ"); }));
165 ASSERT_RAISES(Invalid
, task_group
.AddTask([] { return Future
<>::Make(); }));
166 ASSERT_FINISHES_AND_RAISES(Invalid
, task_group
.End());
169 TYPED_TEST(TypedTestAsyncTaskGroup
, AddAfterFailed
) {
170 TypeParam task_group
;
171 ASSERT_RAISES(Invalid
, task_group
.AddTask([] {
172 return Future
<>::MakeFinished(Status::Invalid("XYZ"));
174 ASSERT_RAISES(Invalid
, task_group
.AddTask([] { return Future
<>::Make(); }));
175 ASSERT_FINISHES_AND_RAISES(Invalid
, task_group
.End());
178 TEST(StandardAsyncTaskGroup
, TaskFinishesAfterError
) {
179 AsyncTaskGroup task_group
;
180 Future
<> fut1
= Future
<>::Make();
181 ASSERT_OK(task_group
.AddTask([fut1
] { return fut1
; }));
182 ASSERT_RAISES(Invalid
, task_group
.AddTask([] {
183 return Future
<>::MakeFinished(Status::Invalid("XYZ"));
185 Future
<> finished_fut
= task_group
.End();
186 AssertNotFinished(finished_fut
);
188 ASSERT_FINISHES_AND_RAISES(Invalid
, finished_fut
);
191 TEST(StandardAsyncTaskGroup
, FailAfterAdd
) {
192 AsyncTaskGroup task_group
;
193 Future
<> will_fail
= Future
<>::Make();
194 ASSERT_OK(task_group
.AddTask([will_fail
] { return will_fail
; }));
195 Future
<> added_later_and_passes
= Future
<>::Make();
197 task_group
.AddTask([added_later_and_passes
] { return added_later_and_passes
; }));
198 will_fail
.MarkFinished(Status::Invalid("XYZ"));
199 ASSERT_RAISES(Invalid
, task_group
.AddTask([] { return Future
<>::Make(); }));
200 Future
<> finished_fut
= task_group
.End();
201 AssertNotFinished(finished_fut
);
202 added_later_and_passes
.MarkFinished();
203 AssertFinished(finished_fut
);
204 ASSERT_FINISHES_AND_RAISES(Invalid
, finished_fut
);
207 // The serialized task group can never really get into a "fail after add" scenario
208 // because there is no parallelism. So the behavior is a little unique in these scenarios
210 TEST(SerializedAsyncTaskGroup
, TaskFinishesAfterError
) {
211 SerializedAsyncTaskGroup task_group
;
212 Future
<> fut1
= Future
<>::Make();
213 ASSERT_OK(task_group
.AddTask([fut1
] { return fut1
; }));
215 task_group
.AddTask([] { return Future
<>::MakeFinished(Status::Invalid("XYZ")); }));
216 Future
<> finished_fut
= task_group
.End();
217 AssertNotFinished(finished_fut
);
219 ASSERT_FINISHES_AND_RAISES(Invalid
, finished_fut
);
222 TEST(SerializedAsyncTaskGroup
, FailAfterAdd
) {
223 SerializedAsyncTaskGroup task_group
;
224 Future
<> will_fail
= Future
<>::Make();
225 ASSERT_OK(task_group
.AddTask([will_fail
] { return will_fail
; }));
226 Future
<> added_later_and_passes
= Future
<>::Make();
227 bool added_later_and_passes_created
= false;
228 ASSERT_OK(task_group
.AddTask([added_later_and_passes
, &added_later_and_passes_created
] {
229 added_later_and_passes_created
= true;
230 return added_later_and_passes
;
232 will_fail
.MarkFinished(Status::Invalid("XYZ"));
233 ASSERT_RAISES(Invalid
, task_group
.AddTask([] { return Future
<>::Make(); }));
234 ASSERT_FINISHES_AND_RAISES(Invalid
, task_group
.End());
235 ASSERT_FALSE(added_later_and_passes_created
);