]>
Commit | Line | Data |
---|---|---|
fe692bf9 FG |
1 | use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; |
2 | use crate::runtime::tests::NoopSchedule; | |
5099ac24 FG |
3 | use crate::util::TryLock; |
4 | ||
5 | use std::collections::VecDeque; | |
fe692bf9 FG |
6 | use std::future::Future; |
7 | use std::sync::atomic::{AtomicBool, Ordering}; | |
5099ac24 FG |
8 | use std::sync::Arc; |
9 | ||
fe692bf9 FG |
10 | struct AssertDropHandle { |
11 | is_dropped: Arc<AtomicBool>, | |
12 | } | |
13 | impl AssertDropHandle { | |
14 | #[track_caller] | |
15 | fn assert_dropped(&self) { | |
16 | assert!(self.is_dropped.load(Ordering::SeqCst)); | |
17 | } | |
18 | ||
19 | #[track_caller] | |
20 | fn assert_not_dropped(&self) { | |
21 | assert!(!self.is_dropped.load(Ordering::SeqCst)); | |
22 | } | |
23 | } | |
24 | ||
25 | struct AssertDrop { | |
26 | is_dropped: Arc<AtomicBool>, | |
27 | } | |
28 | impl AssertDrop { | |
29 | fn new() -> (Self, AssertDropHandle) { | |
30 | let shared = Arc::new(AtomicBool::new(false)); | |
31 | ( | |
32 | AssertDrop { | |
33 | is_dropped: shared.clone(), | |
34 | }, | |
35 | AssertDropHandle { | |
36 | is_dropped: shared.clone(), | |
37 | }, | |
38 | ) | |
39 | } | |
40 | } | |
41 | impl Drop for AssertDrop { | |
42 | fn drop(&mut self) { | |
43 | self.is_dropped.store(true, Ordering::SeqCst); | |
44 | } | |
45 | } | |
46 | ||
47 | // A Notified does not shut down on drop, but it is dropped once the ref-count | |
48 | // hits zero. | |
49 | #[test] | |
50 | fn create_drop1() { | |
51 | let (ad, handle) = AssertDrop::new(); | |
52 | let (notified, join) = unowned( | |
53 | async { | |
54 | drop(ad); | |
55 | unreachable!() | |
56 | }, | |
57 | NoopSchedule, | |
58 | Id::next(), | |
59 | ); | |
60 | drop(notified); | |
61 | handle.assert_not_dropped(); | |
62 | drop(join); | |
63 | handle.assert_dropped(); | |
64 | } | |
65 | ||
5099ac24 | 66 | #[test] |
fe692bf9 FG |
67 | fn create_drop2() { |
68 | let (ad, handle) = AssertDrop::new(); | |
69 | let (notified, join) = unowned( | |
70 | async { | |
71 | drop(ad); | |
72 | unreachable!() | |
73 | }, | |
74 | NoopSchedule, | |
75 | Id::next(), | |
76 | ); | |
77 | drop(join); | |
78 | handle.assert_not_dropped(); | |
79 | drop(notified); | |
80 | handle.assert_dropped(); | |
81 | } | |
82 | ||
83 | #[test] | |
84 | fn drop_abort_handle1() { | |
85 | let (ad, handle) = AssertDrop::new(); | |
86 | let (notified, join) = unowned( | |
87 | async { | |
88 | drop(ad); | |
89 | unreachable!() | |
90 | }, | |
91 | NoopSchedule, | |
92 | Id::next(), | |
93 | ); | |
94 | let abort = join.abort_handle(); | |
95 | drop(join); | |
96 | handle.assert_not_dropped(); | |
97 | drop(notified); | |
98 | handle.assert_not_dropped(); | |
99 | drop(abort); | |
100 | handle.assert_dropped(); | |
101 | } | |
102 | ||
103 | #[test] | |
104 | fn drop_abort_handle2() { | |
105 | let (ad, handle) = AssertDrop::new(); | |
106 | let (notified, join) = unowned( | |
107 | async { | |
108 | drop(ad); | |
109 | unreachable!() | |
110 | }, | |
111 | NoopSchedule, | |
112 | Id::next(), | |
113 | ); | |
114 | let abort = join.abort_handle(); | |
115 | drop(notified); | |
116 | handle.assert_not_dropped(); | |
117 | drop(abort); | |
118 | handle.assert_not_dropped(); | |
119 | drop(join); | |
120 | handle.assert_dropped(); | |
121 | } | |
122 | ||
123 | // Shutting down through Notified works | |
124 | #[test] | |
125 | fn create_shutdown1() { | |
126 | let (ad, handle) = AssertDrop::new(); | |
127 | let (notified, join) = unowned( | |
128 | async { | |
129 | drop(ad); | |
130 | unreachable!() | |
131 | }, | |
132 | NoopSchedule, | |
133 | Id::next(), | |
134 | ); | |
135 | drop(join); | |
136 | handle.assert_not_dropped(); | |
137 | notified.shutdown(); | |
138 | handle.assert_dropped(); | |
139 | } | |
140 | ||
141 | #[test] | |
142 | fn create_shutdown2() { | |
143 | let (ad, handle) = AssertDrop::new(); | |
144 | let (notified, join) = unowned( | |
145 | async { | |
146 | drop(ad); | |
147 | unreachable!() | |
148 | }, | |
149 | NoopSchedule, | |
150 | Id::next(), | |
151 | ); | |
152 | handle.assert_not_dropped(); | |
153 | notified.shutdown(); | |
154 | handle.assert_dropped(); | |
155 | drop(join); | |
156 | } | |
157 | ||
158 | #[test] | |
159 | fn unowned_poll() { | |
160 | let (task, _) = unowned(async {}, NoopSchedule, Id::next()); | |
161 | task.run(); | |
5099ac24 FG |
162 | } |
163 | ||
164 | #[test] | |
165 | fn schedule() { | |
166 | with(|rt| { | |
fe692bf9 | 167 | rt.spawn(async { |
5099ac24 FG |
168 | crate::task::yield_now().await; |
169 | }); | |
170 | ||
5099ac24 | 171 | assert_eq!(2, rt.tick()); |
fe692bf9 | 172 | rt.shutdown(); |
5099ac24 FG |
173 | }) |
174 | } | |
175 | ||
176 | #[test] | |
177 | fn shutdown() { | |
178 | with(|rt| { | |
fe692bf9 | 179 | rt.spawn(async { |
5099ac24 FG |
180 | loop { |
181 | crate::task::yield_now().await; | |
182 | } | |
183 | }); | |
184 | ||
5099ac24 FG |
185 | rt.tick_max(1); |
186 | ||
187 | rt.shutdown(); | |
188 | }) | |
189 | } | |
190 | ||
fe692bf9 FG |
191 | #[test] |
192 | fn shutdown_immediately() { | |
193 | with(|rt| { | |
194 | rt.spawn(async { | |
195 | loop { | |
196 | crate::task::yield_now().await; | |
197 | } | |
198 | }); | |
199 | ||
200 | rt.shutdown(); | |
201 | }) | |
202 | } | |
203 | ||
204 | #[test] | |
205 | fn spawn_during_shutdown() { | |
206 | static DID_SPAWN: AtomicBool = AtomicBool::new(false); | |
207 | ||
208 | struct SpawnOnDrop(Runtime); | |
209 | impl Drop for SpawnOnDrop { | |
210 | fn drop(&mut self) { | |
211 | DID_SPAWN.store(true, Ordering::SeqCst); | |
212 | self.0.spawn(async {}); | |
213 | } | |
214 | } | |
215 | ||
216 | with(|rt| { | |
217 | let rt2 = rt.clone(); | |
218 | rt.spawn(async move { | |
219 | let _spawn_on_drop = SpawnOnDrop(rt2); | |
220 | ||
221 | loop { | |
222 | crate::task::yield_now().await; | |
223 | } | |
224 | }); | |
225 | ||
226 | rt.tick_max(1); | |
227 | rt.shutdown(); | |
228 | }); | |
229 | ||
230 | assert!(DID_SPAWN.load(Ordering::SeqCst)); | |
231 | } | |
232 | ||
5099ac24 FG |
233 | fn with(f: impl FnOnce(Runtime)) { |
234 | struct Reset; | |
235 | ||
236 | impl Drop for Reset { | |
237 | fn drop(&mut self) { | |
238 | let _rt = CURRENT.try_lock().unwrap().take(); | |
239 | } | |
240 | } | |
241 | ||
242 | let _reset = Reset; | |
243 | ||
244 | let rt = Runtime(Arc::new(Inner { | |
fe692bf9 | 245 | owned: OwnedTasks::new(), |
5099ac24 FG |
246 | core: TryLock::new(Core { |
247 | queue: VecDeque::new(), | |
5099ac24 FG |
248 | }), |
249 | })); | |
250 | ||
251 | *CURRENT.try_lock().unwrap() = Some(rt.clone()); | |
252 | f(rt) | |
253 | } | |
254 | ||
255 | #[derive(Clone)] | |
256 | struct Runtime(Arc<Inner>); | |
257 | ||
258 | struct Inner { | |
5099ac24 | 259 | core: TryLock<Core>, |
fe692bf9 | 260 | owned: OwnedTasks<Runtime>, |
5099ac24 FG |
261 | } |
262 | ||
263 | struct Core { | |
264 | queue: VecDeque<task::Notified<Runtime>>, | |
5099ac24 FG |
265 | } |
266 | ||
267 | static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None); | |
268 | ||
269 | impl Runtime { | |
fe692bf9 FG |
270 | fn spawn<T>(&self, future: T) -> JoinHandle<T::Output> |
271 | where | |
272 | T: 'static + Send + Future, | |
273 | T::Output: 'static + Send, | |
274 | { | |
275 | let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next()); | |
276 | ||
277 | if let Some(notified) = notified { | |
278 | self.schedule(notified); | |
279 | } | |
280 | ||
281 | handle | |
282 | } | |
283 | ||
5099ac24 FG |
284 | fn tick(&self) -> usize { |
285 | self.tick_max(usize::MAX) | |
286 | } | |
287 | ||
288 | fn tick_max(&self, max: usize) -> usize { | |
289 | let mut n = 0; | |
290 | ||
291 | while !self.is_empty() && n < max { | |
292 | let task = self.next_task(); | |
293 | n += 1; | |
fe692bf9 | 294 | let task = self.0.owned.assert_owner(task); |
5099ac24 FG |
295 | task.run(); |
296 | } | |
297 | ||
5099ac24 FG |
298 | n |
299 | } | |
300 | ||
301 | fn is_empty(&self) -> bool { | |
302 | self.0.core.try_lock().unwrap().queue.is_empty() | |
303 | } | |
304 | ||
305 | fn next_task(&self) -> task::Notified<Runtime> { | |
306 | self.0.core.try_lock().unwrap().queue.pop_front().unwrap() | |
307 | } | |
308 | ||
309 | fn shutdown(&self) { | |
310 | let mut core = self.0.core.try_lock().unwrap(); | |
311 | ||
fe692bf9 | 312 | self.0.owned.close_and_shutdown_all(); |
5099ac24 FG |
313 | |
314 | while let Some(task) = core.queue.pop_back() { | |
fe692bf9 | 315 | drop(task); |
5099ac24 FG |
316 | } |
317 | ||
318 | drop(core); | |
319 | ||
fe692bf9 | 320 | assert!(self.0.owned.is_empty()); |
5099ac24 FG |
321 | } |
322 | } | |
323 | ||
324 | impl Schedule for Runtime { | |
5099ac24 | 325 | fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { |
fe692bf9 | 326 | self.0.owned.remove(task) |
5099ac24 FG |
327 | } |
328 | ||
329 | fn schedule(&self, task: task::Notified<Self>) { | |
330 | self.0.core.try_lock().unwrap().queue.push_back(task); | |
331 | } | |
332 | } |