]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/async_util_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / async_util_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/util/async_util.h"
19
20 #include <gtest/gtest.h>
21
22 #include "arrow/result.h"
23 #include "arrow/testing/future_util.h"
24 #include "arrow/testing/gtest_util.h"
25
26 namespace arrow {
27 namespace util {
28
29 class GatingDestroyable : public AsyncDestroyable {
30 public:
31 GatingDestroyable(Future<> close_future, bool* destroyed)
32 : close_future_(std::move(close_future)), destroyed_(destroyed) {}
33 ~GatingDestroyable() override { *destroyed_ = true; }
34
35 protected:
36 Future<> DoDestroy() override { return close_future_; }
37
38 private:
39 Future<> close_future_;
40 bool* destroyed_;
41 };
42
43 template <typename Factory>
44 void TestAsyncDestroyable(Factory factory) {
45 Future<> gate = Future<>::Make();
46 bool destroyed = false;
47 bool on_closed = false;
48 {
49 auto obj = factory(gate, &destroyed);
50 obj->on_closed().AddCallback([&](const Status& st) { on_closed = true; });
51 ASSERT_FALSE(destroyed);
52 }
53 ASSERT_FALSE(destroyed);
54 ASSERT_FALSE(on_closed);
55 gate.MarkFinished();
56 ASSERT_TRUE(destroyed);
57 ASSERT_TRUE(on_closed);
58 }
59
60 TEST(AsyncDestroyable, MakeShared) {
61 TestAsyncDestroyable([](Future<> gate, bool* destroyed) {
62 return MakeSharedAsync<GatingDestroyable>(gate, destroyed);
63 });
64 }
65
66 // The next four tests are corner cases but can sometimes occur when using these types
67 // in standard containers on certain versions of the compiler/cpplib. Basically we
68 // want to make sure our deleter is ok with null pointers.
69 TEST(AsyncDestroyable, DefaultUnique) {
70 std::unique_ptr<GatingDestroyable, DestroyingDeleter<GatingDestroyable>> default_ptr;
71 default_ptr.reset();
72 }
73
74 TEST(AsyncDestroyable, NullUnique) {
75 std::unique_ptr<GatingDestroyable, DestroyingDeleter<GatingDestroyable>> null_ptr(
76 nullptr);
77 null_ptr.reset();
78 }
79
80 TEST(AsyncDestroyable, NullShared) {
81 std::shared_ptr<GatingDestroyable> null_ptr(nullptr,
82 DestroyingDeleter<GatingDestroyable>());
83 null_ptr.reset();
84 }
85
86 TEST(AsyncDestroyable, NullUniqueToShared) {
87 std::unique_ptr<GatingDestroyable, DestroyingDeleter<GatingDestroyable>> null_ptr(
88 nullptr);
89 std::shared_ptr<GatingDestroyable> null_shared = std::move(null_ptr);
90 null_shared.reset();
91 }
92
93 TEST(AsyncDestroyable, MakeUnique) {
94 TestAsyncDestroyable([](Future<> gate, bool* destroyed) {
95 return MakeUniqueAsync<GatingDestroyable>(gate, destroyed);
96 });
97 }
98
99 template <typename T>
100 class TypedTestAsyncTaskGroup : public ::testing::Test {};
101
102 using AsyncTaskGroupTypes = ::testing::Types<AsyncTaskGroup, SerializedAsyncTaskGroup>;
103
104 TYPED_TEST_SUITE(TypedTestAsyncTaskGroup, AsyncTaskGroupTypes);
105
106 TYPED_TEST(TypedTestAsyncTaskGroup, Basic) {
107 TypeParam task_group;
108 Future<> fut1 = Future<>::Make();
109 Future<> fut2 = Future<>::Make();
110 ASSERT_OK(task_group.AddTask([fut1]() { return fut1; }));
111 ASSERT_OK(task_group.AddTask([fut2]() { return fut2; }));
112 Future<> all_done = task_group.End();
113 AssertNotFinished(all_done);
114 fut1.MarkFinished();
115 AssertNotFinished(all_done);
116 fut2.MarkFinished();
117 ASSERT_FINISHES_OK(all_done);
118 }
119
120 TYPED_TEST(TypedTestAsyncTaskGroup, NoTasks) {
121 TypeParam task_group;
122 ASSERT_FINISHES_OK(task_group.End());
123 }
124
125 TYPED_TEST(TypedTestAsyncTaskGroup, OnFinishedDoesNotEnd) {
126 TypeParam task_group;
127 Future<> on_finished = task_group.OnFinished();
128 AssertNotFinished(on_finished);
129 ASSERT_FINISHES_OK(task_group.End());
130 ASSERT_FINISHES_OK(on_finished);
131 }
132
133 TYPED_TEST(TypedTestAsyncTaskGroup, AddAfterDone) {
134 TypeParam task_group;
135 ASSERT_FINISHES_OK(task_group.End());
136 ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); }));
137 }
138
139 TYPED_TEST(TypedTestAsyncTaskGroup, AddAfterWaitButBeforeFinish) {
140 TypeParam task_group;
141 Future<> task_one = Future<>::Make();
142 ASSERT_OK(task_group.AddTask([task_one] { return task_one; }));
143 Future<> finish_fut = task_group.End();
144 AssertNotFinished(finish_fut);
145 Future<> task_two = Future<>::Make();
146 ASSERT_OK(task_group.AddTask([task_two] { return task_two; }));
147 AssertNotFinished(finish_fut);
148 task_one.MarkFinished();
149 AssertNotFinished(finish_fut);
150 task_two.MarkFinished();
151 AssertFinished(finish_fut);
152 ASSERT_FINISHES_OK(finish_fut);
153 }
154
155 TYPED_TEST(TypedTestAsyncTaskGroup, Error) {
156 TypeParam task_group;
157 Future<> failed_task = Future<>::MakeFinished(Status::Invalid("XYZ"));
158 ASSERT_RAISES(Invalid, task_group.AddTask([failed_task] { return failed_task; }));
159 ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End());
160 }
161
162 TYPED_TEST(TypedTestAsyncTaskGroup, TaskFactoryFails) {
163 TypeParam task_group;
164 ASSERT_RAISES(Invalid, task_group.AddTask([] { return Status::Invalid("XYZ"); }));
165 ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); }));
166 ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End());
167 }
168
169 TYPED_TEST(TypedTestAsyncTaskGroup, AddAfterFailed) {
170 TypeParam task_group;
171 ASSERT_RAISES(Invalid, task_group.AddTask([] {
172 return Future<>::MakeFinished(Status::Invalid("XYZ"));
173 }));
174 ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); }));
175 ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End());
176 }
177
178 TEST(StandardAsyncTaskGroup, TaskFinishesAfterError) {
179 AsyncTaskGroup task_group;
180 Future<> fut1 = Future<>::Make();
181 ASSERT_OK(task_group.AddTask([fut1] { return fut1; }));
182 ASSERT_RAISES(Invalid, task_group.AddTask([] {
183 return Future<>::MakeFinished(Status::Invalid("XYZ"));
184 }));
185 Future<> finished_fut = task_group.End();
186 AssertNotFinished(finished_fut);
187 fut1.MarkFinished();
188 ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut);
189 }
190
191 TEST(StandardAsyncTaskGroup, FailAfterAdd) {
192 AsyncTaskGroup task_group;
193 Future<> will_fail = Future<>::Make();
194 ASSERT_OK(task_group.AddTask([will_fail] { return will_fail; }));
195 Future<> added_later_and_passes = Future<>::Make();
196 ASSERT_OK(
197 task_group.AddTask([added_later_and_passes] { return added_later_and_passes; }));
198 will_fail.MarkFinished(Status::Invalid("XYZ"));
199 ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); }));
200 Future<> finished_fut = task_group.End();
201 AssertNotFinished(finished_fut);
202 added_later_and_passes.MarkFinished();
203 AssertFinished(finished_fut);
204 ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut);
205 }
206
207 // The serialized task group can never really get into a "fail after add" scenario
208 // because there is no parallelism. So the behavior is a little unique in these scenarios
209
210 TEST(SerializedAsyncTaskGroup, TaskFinishesAfterError) {
211 SerializedAsyncTaskGroup task_group;
212 Future<> fut1 = Future<>::Make();
213 ASSERT_OK(task_group.AddTask([fut1] { return fut1; }));
214 ASSERT_OK(
215 task_group.AddTask([] { return Future<>::MakeFinished(Status::Invalid("XYZ")); }));
216 Future<> finished_fut = task_group.End();
217 AssertNotFinished(finished_fut);
218 fut1.MarkFinished();
219 ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut);
220 }
221
222 TEST(SerializedAsyncTaskGroup, FailAfterAdd) {
223 SerializedAsyncTaskGroup task_group;
224 Future<> will_fail = Future<>::Make();
225 ASSERT_OK(task_group.AddTask([will_fail] { return will_fail; }));
226 Future<> added_later_and_passes = Future<>::Make();
227 bool added_later_and_passes_created = false;
228 ASSERT_OK(task_group.AddTask([added_later_and_passes, &added_later_and_passes_created] {
229 added_later_and_passes_created = true;
230 return added_later_and_passes;
231 }));
232 will_fail.MarkFinished(Status::Invalid("XYZ"));
233 ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); }));
234 ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End());
235 ASSERT_FALSE(added_later_and_passes_created);
236 }
237
238 } // namespace util
239 } // namespace arrow