1 use crate::ty
::context
::TyCtxt
;
2 use crate::ty
::query
::plumbing
::CycleError
;
3 use crate::ty
::query
::Query
;
6 use rustc_data_structures
::sync
::Lrc
;
9 #[cfg(not(parallel_compiler))]
12 #[cfg(parallel_compiler)]
14 parking_lot
::{Mutex, Condvar}
,
15 rustc_data_structures
::{jobserver, OnDrop}
,
16 rustc_data_structures
::fx
::FxHashSet
,
17 rustc_data_structures
::stable_hasher
::{StableHasher, HashStable}
,
18 rustc_data_structures
::sync
::Lock
,
19 rustc_rayon_core
as rayon_core
,
21 std
::{mem, process, thread}
,
22 std
::iter
::FromIterator
,
25 /// Represents a span and a query key.
26 #[derive(Clone, Debug)]
27 pub struct QueryInfo
<'tcx
> {
28 /// The span corresponding to the reason for which this query was required.
30 pub query
: Query
<'tcx
>,
33 /// Representss an object representing an active query job.
34 pub struct QueryJob
<'tcx
> {
35 pub info
: QueryInfo
<'tcx
>,
37 /// The parent query job which created this job and is implicitly waiting on it.
38 pub parent
: Option
<Lrc
<QueryJob
<'tcx
>>>,
40 /// The latch that is used to wait on this job.
41 #[cfg(parallel_compiler)]
42 latch
: QueryLatch
<'tcx
>,
45 impl<'tcx
> QueryJob
<'tcx
> {
46 /// Creates a new query job.
47 pub fn new(info
: QueryInfo
<'tcx
>, parent
: Option
<Lrc
<QueryJob
<'tcx
>>>) -> Self {
51 #[cfg(parallel_compiler)]
52 latch
: QueryLatch
::new(),
56 /// Awaits for the query job to complete.
57 #[cfg(parallel_compiler)]
58 pub(super) fn r
#await(
62 ) -> Result
<(), CycleError
<'tcx
>> {
63 tls
::with_related_context(tcx
, move |icx
| {
64 let waiter
= Lrc
::new(QueryWaiter
{
65 query
: icx
.query
.clone(),
67 cycle
: Lock
::new(None
),
68 condvar
: Condvar
::new(),
70 self.latch
.r
#await(&waiter);
71 // FIXME: Get rid of this lock. We have ownership of the QueryWaiter
72 // although another thread may still have a Lrc reference so we cannot
74 let mut cycle
= waiter
.cycle
.lock();
77 Some(cycle
) => Err(cycle
)
82 #[cfg(not(parallel_compiler))]
83 pub(super) fn find_cycle_in_stack(&self, tcx
: TyCtxt
<'tcx
>, span
: Span
) -> CycleError
<'tcx
> {
84 // Get the current executing query (waiter) and find the waitee amongst its parents
85 let mut current_job
= tls
::with_related_context(tcx
, |icx
| icx
.query
.clone());
86 let mut cycle
= Vec
::new();
88 while let Some(job
) = current_job
{
89 cycle
.push(job
.info
.clone());
91 if ptr
::eq(&*job
, self) {
94 // This is the end of the cycle
95 // The span entry we included was for the usage
96 // of the cycle itself, and not part of the cycle
97 // Replace it with the span which caused the cycle to form
99 // Find out why the cycle itself was used
100 let usage
= job
.parent
.as_ref().map(|parent
| {
101 (job
.info
.span
, parent
.info
.query
.clone())
103 return CycleError { usage, cycle }
;
106 current_job
= job
.parent
.clone();
109 panic
!("did not find a cycle")
112 /// Signals to waiters that the query is complete.
114 /// This does nothing for single threaded rustc,
115 /// as there are no concurrent jobs which could be waiting on us
116 pub fn signal_complete(&self) {
117 #[cfg(parallel_compiler)]
121 #[cfg(parallel_compiler)]
122 fn as_ptr(&self) -> *const QueryJob
<'tcx
> {
127 #[cfg(parallel_compiler)]
128 struct QueryWaiter
<'tcx
> {
129 query
: Option
<Lrc
<QueryJob
<'tcx
>>>,
132 cycle
: Lock
<Option
<CycleError
<'tcx
>>>,
135 #[cfg(parallel_compiler)]
136 impl<'tcx
> QueryWaiter
<'tcx
> {
137 fn notify(&self, registry
: &rayon_core
::Registry
) {
138 rayon_core
::mark_unblocked(registry
);
139 self.condvar
.notify_one();
143 #[cfg(parallel_compiler)]
144 struct QueryLatchInfo
<'tcx
> {
146 waiters
: Vec
<Lrc
<QueryWaiter
<'tcx
>>>,
149 #[cfg(parallel_compiler)]
150 struct QueryLatch
<'tcx
> {
151 info
: Mutex
<QueryLatchInfo
<'tcx
>>,
154 #[cfg(parallel_compiler)]
155 impl<'tcx
> QueryLatch
<'tcx
> {
158 info
: Mutex
::new(QueryLatchInfo
{
165 /// Awaits the caller on this latch by blocking the current thread.
166 fn r
#await(&self, waiter: &Lrc<QueryWaiter<'tcx>>) {
167 let mut info
= self.info
.lock();
169 // We push the waiter on to the `waiters` list. It can be accessed inside
170 // the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
171 // Both of these will remove it from the `waiters` list before resuming
173 info
.waiters
.push(waiter
.clone());
175 // If this detects a deadlock and the deadlock handler wants to resume this thread
176 // we have to be in the `wait` call. This is ensured by the deadlock handler
177 // getting the self.info lock.
178 rayon_core
::mark_blocked();
179 jobserver
::release_thread();
180 waiter
.condvar
.wait(&mut info
);
181 // Release the lock before we potentially block in `acquire_thread`
183 jobserver
::acquire_thread();
187 /// Sets the latch and resumes all waiters on it
189 let mut info
= self.info
.lock();
190 debug_assert
!(!info
.complete
);
191 info
.complete
= true;
192 let registry
= rayon_core
::Registry
::current();
193 for waiter
in info
.waiters
.drain(..) {
194 waiter
.notify(®istry
);
198 /// Removes a single waiter from the list of waiters.
199 /// This is used to break query cycles.
203 ) -> Lrc
<QueryWaiter
<'tcx
>> {
204 let mut info
= self.info
.lock();
205 debug_assert
!(!info
.complete
);
206 // Remove the waiter from the list of waiters
207 info
.waiters
.remove(waiter
)
211 /// A resumable waiter of a query. The usize is the index into waiters in the query's latch
212 #[cfg(parallel_compiler)]
213 type Waiter
<'tcx
> = (Lrc
<QueryJob
<'tcx
>>, usize);
215 /// Visits all the non-resumable and resumable waiters of a query.
216 /// Only waiters in a query are visited.
217 /// `visit` is called for every waiter and is passed a query waiting on `query_ref`
218 /// and a span indicating the reason the query waited on `query_ref`.
219 /// If `visit` returns Some, this function returns.
220 /// For visits of non-resumable waiters it returns the return value of `visit`.
221 /// For visits of resumable waiters it returns Some(Some(Waiter)) which has the
222 /// required information to resume the waiter.
223 /// If all `visit` calls returns None, this function also returns None.
224 #[cfg(parallel_compiler)]
225 fn visit_waiters
<'tcx
, F
>(query
: Lrc
<QueryJob
<'tcx
>>, mut visit
: F
) -> Option
<Option
<Waiter
<'tcx
>>>
227 F
: FnMut(Span
, Lrc
<QueryJob
<'tcx
>>) -> Option
<Option
<Waiter
<'tcx
>>>
229 // Visit the parent query which is a non-resumable waiter since it's on the same stack
230 if let Some(ref parent
) = query
.parent
{
231 if let Some(cycle
) = visit(query
.info
.span
, parent
.clone()) {
236 // Visit the explicit waiters which use condvars and are resumable
237 for (i
, waiter
) in query
.latch
.info
.lock().waiters
.iter().enumerate() {
238 if let Some(ref waiter_query
) = waiter
.query
{
239 if visit(waiter
.span
, waiter_query
.clone()).is_some() {
240 // Return a value which indicates that this waiter can be resumed
241 return Some(Some((query
.clone(), i
)));
248 /// Look for query cycles by doing a depth first search starting at `query`.
249 /// `span` is the reason for the `query` to execute. This is initially DUMMY_SP.
250 /// If a cycle is detected, this initial value is replaced with the span causing
252 #[cfg(parallel_compiler)]
253 fn cycle_check
<'tcx
>(query
: Lrc
<QueryJob
<'tcx
>>,
255 stack
: &mut Vec
<(Span
, Lrc
<QueryJob
<'tcx
>>)>,
256 visited
: &mut FxHashSet
<*const QueryJob
<'tcx
>>
257 ) -> Option
<Option
<Waiter
<'tcx
>>> {
258 if !visited
.insert(query
.as_ptr()) {
259 return if let Some(p
) = stack
.iter().position(|q
| q
.1.as_ptr() == query
.as_ptr()) {
260 // We detected a query cycle, fix up the initial span and return Some
262 // Remove previous stack entries
264 // Replace the span for the first query with the cycle cause
272 // Query marked as visited is added it to the stack
273 stack
.push((span
, query
.clone()));
275 // Visit all the waiters
276 let r
= visit_waiters(query
, |span
, successor
| {
277 cycle_check(successor
, span
, stack
, visited
)
280 // Remove the entry in our stack if we didn't find a cycle
288 /// Finds out if there's a path to the compiler root (aka. code which isn't in a query)
289 /// from `query` without going through any of the queries in `visited`.
290 /// This is achieved with a depth first search.
291 #[cfg(parallel_compiler)]
292 fn connected_to_root
<'tcx
>(
293 query
: Lrc
<QueryJob
<'tcx
>>,
294 visited
: &mut FxHashSet
<*const QueryJob
<'tcx
>>
296 // We already visited this or we're deliberately ignoring it
297 if !visited
.insert(query
.as_ptr()) {
301 // This query is connected to the root (it has no query parent), return true
302 if query
.parent
.is_none() {
306 visit_waiters(query
, |_
, successor
| connected_to_root(successor
, visited
).then_some(None
))
310 // Deterministically pick an query from a list
311 #[cfg(parallel_compiler)]
312 fn pick_query
<'a
, 'tcx
, T
, F
: Fn(&T
) -> (Span
, Lrc
<QueryJob
<'tcx
>>)>(
317 // Deterministically pick an entry point
318 // FIXME: Sort this instead
319 let mut hcx
= tcx
.create_stable_hashing_context();
320 queries
.iter().min_by_key(|v
| {
321 let (span
, query
) = f(v
);
322 let mut stable_hasher
= StableHasher
::new();
323 query
.info
.query
.hash_stable(&mut hcx
, &mut stable_hasher
);
324 // Prefer entry points which have valid spans for nicer error messages
325 // We add an integer to the tuple ensuring that entry points
326 // with valid spans are picked first
327 let span_cmp
= if span
== DUMMY_SP { 1 }
else { 0 }
;
328 (span_cmp
, stable_hasher
.finish
::<u64>())
332 /// Looks for query cycles starting from the last query in `jobs`.
333 /// If a cycle is found, all queries in the cycle is removed from `jobs` and
334 /// the function return true.
335 /// If a cycle was not found, the starting query is removed from `jobs` and
336 /// the function returns false.
337 #[cfg(parallel_compiler)]
338 fn remove_cycle
<'tcx
>(
339 jobs
: &mut Vec
<Lrc
<QueryJob
<'tcx
>>>,
340 wakelist
: &mut Vec
<Lrc
<QueryWaiter
<'tcx
>>>,
343 let mut visited
= FxHashSet
::default();
344 let mut stack
= Vec
::new();
345 // Look for a cycle starting with the last query in `jobs`
346 if let Some(waiter
) = cycle_check(jobs
.pop().unwrap(),
350 // The stack is a vector of pairs of spans and queries; reverse it so that
351 // the earlier entries require later entries
352 let (mut spans
, queries
): (Vec
<_
>, Vec
<_
>) = stack
.into_iter().rev().unzip();
354 // Shift the spans so that queries are matched with the span for their waitee
355 spans
.rotate_right(1);
357 // Zip them back together
358 let mut stack
: Vec
<_
> = spans
.into_iter().zip(queries
).collect();
360 // Remove the queries in our cycle from the list of jobs to look at
362 if let Some(pos
) = jobs
.iter().position(|j
| j
.as_ptr() == r
.1.as_ptr()) {
367 // Find the queries in the cycle which are
368 // connected to queries outside the cycle
369 let entry_points
= stack
.iter().filter_map(|(span
, query
)| {
370 if query
.parent
.is_none() {
371 // This query is connected to the root (it has no query parent)
372 Some((*span
, query
.clone(), None
))
374 let mut waiters
= Vec
::new();
375 // Find all the direct waiters who lead to the root
376 visit_waiters(query
.clone(), |span
, waiter
| {
377 // Mark all the other queries in the cycle as already visited
378 let mut visited
= FxHashSet
::from_iter(stack
.iter().map(|q
| q
.1.as_ptr()));
380 if connected_to_root(waiter
.clone(), &mut visited
) {
381 waiters
.push((span
, waiter
));
386 if waiters
.is_empty() {
389 // Deterministically pick one of the waiters to show to the user
390 let waiter
= pick_query(tcx
, &waiters
, |s
| s
.clone()).clone();
391 Some((*span
, query
.clone(), Some(waiter
)))
394 }).collect
::<Vec
<(Span
, Lrc
<QueryJob
<'tcx
>>, Option
<(Span
, Lrc
<QueryJob
<'tcx
>>)>)>>();
396 // Deterministically pick an entry point
397 let (_
, entry_point
, usage
) = pick_query(tcx
, &entry_points
, |e
| (e
.0, e
.1.clone()));
399 // Shift the stack so that our entry point is first
400 let entry_point_pos
= stack
.iter().position(|(_
, query
)| {
401 query
.as_ptr() == entry_point
.as_ptr()
403 if let Some(pos
) = entry_point_pos
{
404 stack
.rotate_left(pos
);
407 let usage
= usage
.as_ref().map(|(span
, query
)| (*span
, query
.info
.query
.clone()));
409 // Create the cycle error
410 let error
= CycleError
{
412 cycle
: stack
.iter().map(|&(s
, ref q
)| QueryInfo
{
414 query
: q
.info
.query
.clone(),
418 // We unwrap `waiter` here since there must always be one
419 // edge which is resumeable / waited using a query latch
420 let (waitee_query
, waiter_idx
) = waiter
.unwrap();
422 // Extract the waiter we want to resume
423 let waiter
= waitee_query
.latch
.extract_waiter(waiter_idx
);
425 // Set the cycle error so it will be picked up when resumed
426 *waiter
.cycle
.lock() = Some(error
);
428 // Put the waiter on the list of things to resume
429 wakelist
.push(waiter
);
437 /// Creates a new thread and forwards information in thread locals to it.
438 /// The new thread runs the deadlock handler.
439 /// Must only be called when a deadlock is about to happen.
440 #[cfg(parallel_compiler)]
441 pub unsafe fn handle_deadlock() {
442 let registry
= rayon_core
::Registry
::current();
444 let gcx_ptr
= tls
::GCX_PTR
.with(|gcx_ptr
| {
447 let gcx_ptr
= &*gcx_ptr
;
449 let syntax_pos_globals
= syntax_pos
::GLOBALS
.with(|syntax_pos_globals
| {
450 syntax_pos_globals
as *const _
452 let syntax_pos_globals
= &*syntax_pos_globals
;
453 thread
::spawn(move || {
454 tls
::GCX_PTR
.set(gcx_ptr
, || {
455 syntax_pos
::GLOBALS
.set(syntax_pos_globals
, || {
456 syntax_pos
::GLOBALS
.set(syntax_pos_globals
, || {
457 tls
::with_thread_locals(|| {
458 tls
::with_global(|tcx
| deadlock(tcx
, ®istry
))
466 /// Detects query cycles by using depth first search over all active query jobs.
467 /// If a query cycle is found it will break the cycle by finding an edge which
468 /// uses a query latch and then resuming that waiter.
469 /// There may be multiple cycles involved in a deadlock, so this searches
470 /// all active queries for cycles before finally resuming all the waiters at once.
471 #[cfg(parallel_compiler)]
472 fn deadlock(tcx
: TyCtxt
<'_
>, registry
: &rayon_core
::Registry
) {
473 let on_panic
= OnDrop(|| {
474 eprintln
!("deadlock handler panicked, aborting process");
478 let mut wakelist
= Vec
::new();
479 let mut jobs
: Vec
<_
> = tcx
.queries
.collect_active_jobs();
481 let mut found_cycle
= false;
483 while jobs
.len() > 0 {
484 if remove_cycle(&mut jobs
, &mut wakelist
, tcx
) {
489 // Check that a cycle was found. It is possible for a deadlock to occur without
490 // a query cycle if a query which can be waited on uses Rayon to do multithreading
491 // internally. Such a query (X) may be executing on 2 threads (A and B) and A may
492 // wait using Rayon on B. Rayon may then switch to executing another query (Y)
493 // which in turn will wait on X causing a deadlock. We have a false dependency from
494 // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here
495 // only considers the true dependency and won't detect a cycle.
496 assert
!(found_cycle
);
498 // FIXME: Ensure this won't cause a deadlock before we return
499 for waiter
in wakelist
.into_iter() {
500 waiter
.notify(registry
);