]> git.proxmox.com Git - rustc.git/blame - vendor/tokio/src/runtime/tests/task.rs
New upstream version 1.72.1+dfsg1
[rustc.git] / vendor / tokio / src / runtime / tests / task.rs
CommitLineData
fe692bf9
FG
1use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
2use crate::runtime::tests::NoopSchedule;
5099ac24
FG
3use crate::util::TryLock;
4
5use std::collections::VecDeque;
fe692bf9
FG
6use std::future::Future;
7use std::sync::atomic::{AtomicBool, Ordering};
5099ac24
FG
8use std::sync::Arc;
9
fe692bf9
FG
10struct AssertDropHandle {
11 is_dropped: Arc<AtomicBool>,
12}
13impl AssertDropHandle {
14 #[track_caller]
15 fn assert_dropped(&self) {
16 assert!(self.is_dropped.load(Ordering::SeqCst));
17 }
18
19 #[track_caller]
20 fn assert_not_dropped(&self) {
21 assert!(!self.is_dropped.load(Ordering::SeqCst));
22 }
23}
24
25struct AssertDrop {
26 is_dropped: Arc<AtomicBool>,
27}
28impl AssertDrop {
29 fn new() -> (Self, AssertDropHandle) {
30 let shared = Arc::new(AtomicBool::new(false));
31 (
32 AssertDrop {
33 is_dropped: shared.clone(),
34 },
35 AssertDropHandle {
36 is_dropped: shared.clone(),
37 },
38 )
39 }
40}
41impl Drop for AssertDrop {
42 fn drop(&mut self) {
43 self.is_dropped.store(true, Ordering::SeqCst);
44 }
45}
46
47// A Notified does not shut down on drop, but it is dropped once the ref-count
48// hits zero.
49#[test]
50fn create_drop1() {
51 let (ad, handle) = AssertDrop::new();
52 let (notified, join) = unowned(
53 async {
54 drop(ad);
55 unreachable!()
56 },
57 NoopSchedule,
58 Id::next(),
59 );
60 drop(notified);
61 handle.assert_not_dropped();
62 drop(join);
63 handle.assert_dropped();
64}
65
5099ac24 66#[test]
fe692bf9
FG
67fn create_drop2() {
68 let (ad, handle) = AssertDrop::new();
69 let (notified, join) = unowned(
70 async {
71 drop(ad);
72 unreachable!()
73 },
74 NoopSchedule,
75 Id::next(),
76 );
77 drop(join);
78 handle.assert_not_dropped();
79 drop(notified);
80 handle.assert_dropped();
81}
82
83#[test]
84fn drop_abort_handle1() {
85 let (ad, handle) = AssertDrop::new();
86 let (notified, join) = unowned(
87 async {
88 drop(ad);
89 unreachable!()
90 },
91 NoopSchedule,
92 Id::next(),
93 );
94 let abort = join.abort_handle();
95 drop(join);
96 handle.assert_not_dropped();
97 drop(notified);
98 handle.assert_not_dropped();
99 drop(abort);
100 handle.assert_dropped();
101}
102
103#[test]
104fn drop_abort_handle2() {
105 let (ad, handle) = AssertDrop::new();
106 let (notified, join) = unowned(
107 async {
108 drop(ad);
109 unreachable!()
110 },
111 NoopSchedule,
112 Id::next(),
113 );
114 let abort = join.abort_handle();
115 drop(notified);
116 handle.assert_not_dropped();
117 drop(abort);
118 handle.assert_not_dropped();
119 drop(join);
120 handle.assert_dropped();
121}
122
123// Shutting down through Notified works
124#[test]
125fn create_shutdown1() {
126 let (ad, handle) = AssertDrop::new();
127 let (notified, join) = unowned(
128 async {
129 drop(ad);
130 unreachable!()
131 },
132 NoopSchedule,
133 Id::next(),
134 );
135 drop(join);
136 handle.assert_not_dropped();
137 notified.shutdown();
138 handle.assert_dropped();
139}
140
141#[test]
142fn create_shutdown2() {
143 let (ad, handle) = AssertDrop::new();
144 let (notified, join) = unowned(
145 async {
146 drop(ad);
147 unreachable!()
148 },
149 NoopSchedule,
150 Id::next(),
151 );
152 handle.assert_not_dropped();
153 notified.shutdown();
154 handle.assert_dropped();
155 drop(join);
156}
157
158#[test]
159fn unowned_poll() {
160 let (task, _) = unowned(async {}, NoopSchedule, Id::next());
161 task.run();
5099ac24
FG
162}
163
164#[test]
165fn schedule() {
166 with(|rt| {
fe692bf9 167 rt.spawn(async {
5099ac24
FG
168 crate::task::yield_now().await;
169 });
170
5099ac24 171 assert_eq!(2, rt.tick());
fe692bf9 172 rt.shutdown();
5099ac24
FG
173 })
174}
175
176#[test]
177fn shutdown() {
178 with(|rt| {
fe692bf9 179 rt.spawn(async {
5099ac24
FG
180 loop {
181 crate::task::yield_now().await;
182 }
183 });
184
5099ac24
FG
185 rt.tick_max(1);
186
187 rt.shutdown();
188 })
189}
190
fe692bf9
FG
191#[test]
192fn shutdown_immediately() {
193 with(|rt| {
194 rt.spawn(async {
195 loop {
196 crate::task::yield_now().await;
197 }
198 });
199
200 rt.shutdown();
201 })
202}
203
204#[test]
205fn spawn_during_shutdown() {
206 static DID_SPAWN: AtomicBool = AtomicBool::new(false);
207
208 struct SpawnOnDrop(Runtime);
209 impl Drop for SpawnOnDrop {
210 fn drop(&mut self) {
211 DID_SPAWN.store(true, Ordering::SeqCst);
212 self.0.spawn(async {});
213 }
214 }
215
216 with(|rt| {
217 let rt2 = rt.clone();
218 rt.spawn(async move {
219 let _spawn_on_drop = SpawnOnDrop(rt2);
220
221 loop {
222 crate::task::yield_now().await;
223 }
224 });
225
226 rt.tick_max(1);
227 rt.shutdown();
228 });
229
230 assert!(DID_SPAWN.load(Ordering::SeqCst));
231}
232
5099ac24
FG
233fn with(f: impl FnOnce(Runtime)) {
234 struct Reset;
235
236 impl Drop for Reset {
237 fn drop(&mut self) {
238 let _rt = CURRENT.try_lock().unwrap().take();
239 }
240 }
241
242 let _reset = Reset;
243
244 let rt = Runtime(Arc::new(Inner {
fe692bf9 245 owned: OwnedTasks::new(),
5099ac24
FG
246 core: TryLock::new(Core {
247 queue: VecDeque::new(),
5099ac24
FG
248 }),
249 }));
250
251 *CURRENT.try_lock().unwrap() = Some(rt.clone());
252 f(rt)
253}
254
255#[derive(Clone)]
256struct Runtime(Arc<Inner>);
257
258struct Inner {
5099ac24 259 core: TryLock<Core>,
fe692bf9 260 owned: OwnedTasks<Runtime>,
5099ac24
FG
261}
262
263struct Core {
264 queue: VecDeque<task::Notified<Runtime>>,
5099ac24
FG
265}
266
267static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
268
269impl Runtime {
fe692bf9
FG
270 fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
271 where
272 T: 'static + Send + Future,
273 T::Output: 'static + Send,
274 {
275 let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
276
277 if let Some(notified) = notified {
278 self.schedule(notified);
279 }
280
281 handle
282 }
283
5099ac24
FG
284 fn tick(&self) -> usize {
285 self.tick_max(usize::MAX)
286 }
287
288 fn tick_max(&self, max: usize) -> usize {
289 let mut n = 0;
290
291 while !self.is_empty() && n < max {
292 let task = self.next_task();
293 n += 1;
fe692bf9 294 let task = self.0.owned.assert_owner(task);
5099ac24
FG
295 task.run();
296 }
297
5099ac24
FG
298 n
299 }
300
301 fn is_empty(&self) -> bool {
302 self.0.core.try_lock().unwrap().queue.is_empty()
303 }
304
305 fn next_task(&self) -> task::Notified<Runtime> {
306 self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
307 }
308
309 fn shutdown(&self) {
310 let mut core = self.0.core.try_lock().unwrap();
311
fe692bf9 312 self.0.owned.close_and_shutdown_all();
5099ac24
FG
313
314 while let Some(task) = core.queue.pop_back() {
fe692bf9 315 drop(task);
5099ac24
FG
316 }
317
318 drop(core);
319
fe692bf9 320 assert!(self.0.owned.is_empty());
5099ac24
FG
321 }
322}
323
324impl Schedule for Runtime {
5099ac24 325 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
fe692bf9 326 self.0.owned.remove(task)
5099ac24
FG
327 }
328
329 fn schedule(&self, task: task::Notified<Self>) {
330 self.0.core.try_lock().unwrap().queue.push_back(task);
331 }
332}