]> git.proxmox.com Git - cargo.git/commitdiff
Registry functions return task::Poll to enable parallel fetching of index data.
authorArlo Siemsen <arsiem@microsoft.com>
Mon, 8 Nov 2021 21:56:44 +0000 (13:56 -0800)
committerArlo Siemsen <arsiem@microsoft.com>
Mon, 28 Feb 2022 20:22:11 +0000 (12:22 -0800)
17 files changed:
crates/resolver-tests/src/lib.rs
src/cargo/core/compiler/future_incompat.rs
src/cargo/core/registry.rs
src/cargo/core/resolver/dep_cache.rs
src/cargo/core/resolver/errors.rs
src/cargo/core/resolver/mod.rs
src/cargo/core/source/mod.rs
src/cargo/ops/common_for_install_and_uninstall.rs
src/cargo/sources/directory.rs
src/cargo/sources/git/source.rs
src/cargo/sources/path.rs
src/cargo/sources/registry/index.rs
src/cargo/sources/registry/local.rs
src/cargo/sources/registry/mod.rs
src/cargo/sources/registry/remote.rs
src/cargo/sources/replaced.rs
src/cargo/util/network.rs

index f47017d1d2a52be95770e67837b66223652a4eeb..eae6469545d40c2d214823e4915c4e353e7e12a1 100644 (file)
@@ -7,6 +7,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
 use std::fmt;
 use std::fmt::Write;
 use std::rc::Rc;
+use std::task::Poll;
 use std::time::Instant;
 
 use cargo::core::dependency::DepKind;
@@ -129,14 +130,14 @@ pub fn resolve_with_config_raw(
             dep: &Dependency,
             f: &mut dyn FnMut(Summary),
             fuzzy: bool,
-        ) -> CargoResult<()> {
+        ) -> Poll<CargoResult<()>> {
             for summary in self.list.iter() {
                 if fuzzy || dep.matches(summary) {
                     self.used.insert(summary.package_id());
                     f(summary.clone());
                 }
             }
-            Ok(())
+            Poll::Ready(Ok(()))
         }
 
         fn describe_source(&self, _src: SourceId) -> String {
@@ -146,6 +147,10 @@ pub fn resolve_with_config_raw(
         fn is_replaced(&self, _src: SourceId) -> bool {
             false
         }
+
+        fn block_until_ready(&mut self) -> CargoResult<()> {
+            Ok(())
+        }
     }
     impl<'a> Drop for MyRegistry<'a> {
         fn drop(&mut self) {
index e13d331e0b887cf507f4928c799f37d942d506b0..b838041aaca38cd3ef49fc156119885a75c13243 100644 (file)
@@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
 use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
 use std::fmt::Write as _;
 use std::io::{Read, Write};
+use std::task::Poll;
 
 pub const REPORT_PREAMBLE: &str = "\
 The following warnings were discovered during the build. These warnings are an
@@ -264,7 +265,7 @@ fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet<PackageId>) -> Option<
     let _lock = ws.config().acquire_package_cache_lock().ok()?;
     // Create a set of updated registry sources.
     let map = SourceConfigMap::new(ws.config()).ok()?;
-    let package_ids: BTreeSet<_> = package_ids
+    let mut package_ids: BTreeSet<_> = package_ids
         .iter()
         .filter(|pkg_id| pkg_id.source_id().is_registry())
         .collect();
@@ -279,15 +280,35 @@ fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet<PackageId>) -> Option<
             Some((sid, source))
         })
         .collect();
-    // Query the sources for new versions.
+
+    // Query the sources for available versions, mapping `package_ids` into `summaries`.
+    let mut summaries = Vec::new();
+    while !package_ids.is_empty() {
+        package_ids.retain(|&pkg_id| {
+            let source = match sources.get_mut(&pkg_id.source_id()) {
+                Some(s) => s,
+                None => return false,
+            };
+            let dep = match Dependency::parse(pkg_id.name(), None, pkg_id.source_id()) {
+                Ok(dep) => dep,
+                Err(_) => return false,
+            };
+            match source.query_vec(&dep) {
+                Poll::Ready(Ok(sum)) => {
+                    summaries.push((pkg_id, sum));
+                    false
+                }
+                Poll::Ready(Err(_)) => false,
+                Poll::Pending => true,
+            }
+        });
+        for (_, source) in sources.iter_mut() {
+            source.block_until_ready().ok()?;
+        }
+    }
+
     let mut updates = String::new();
-    for pkg_id in package_ids {
-        let source = match sources.get_mut(&pkg_id.source_id()) {
-            Some(s) => s,
-            None => continue,
-        };
-        let dep = Dependency::parse(pkg_id.name(), None, pkg_id.source_id()).ok()?;
-        let summaries = source.query_vec(&dep).ok()?;
+    for (pkg_id, summaries) in summaries {
         let mut updated_versions: Vec<_> = summaries
             .iter()
             .map(|summary| summary.version())
index 47a1179670171d8dd69c3ecae4defb88214a7dc4..f158f85e176db9a46f836cd213bfb3e207b50f14 100644 (file)
@@ -1,4 +1,5 @@
 use std::collections::{HashMap, HashSet};
+use std::task::Poll;
 
 use crate::core::PackageSet;
 use crate::core::{Dependency, PackageId, Source, SourceId, SourceMap, Summary};
@@ -20,16 +21,19 @@ pub trait Registry {
         dep: &Dependency,
         f: &mut dyn FnMut(Summary),
         fuzzy: bool,
-    ) -> CargoResult<()>;
+    ) -> Poll<CargoResult<()>>;
 
-    fn query_vec(&mut self, dep: &Dependency, fuzzy: bool) -> CargoResult<Vec<Summary>> {
+    fn query_vec(&mut self, dep: &Dependency, fuzzy: bool) -> Poll<CargoResult<Vec<Summary>>> {
         let mut ret = Vec::new();
-        self.query(dep, &mut |s| ret.push(s), fuzzy)?;
-        Ok(ret)
+        self.query(dep, &mut |s| ret.push(s), fuzzy)
+            .map_ok(|()| ret)
     }
 
     fn describe_source(&self, source: SourceId) -> String;
     fn is_replaced(&self, source: SourceId) -> bool;
+
+    /// Block until all outstanding Poll::Pending requests are Poll::Ready.
+    fn block_until_ready(&mut self) -> CargoResult<()>;
 }
 
 /// This structure represents a registry of known packages. It internally
@@ -274,16 +278,18 @@ impl<'cfg> PackageRegistry<'cfg> {
         // Remember that each dependency listed in `[patch]` has to resolve to
         // precisely one package, so that's why we're just creating a flat list
         // of summaries which should be the same length as `deps` above.
-        let unlocked_summaries = deps
-            .iter()
-            .map(|(orig_patch, locked)| {
-                // Remove double reference in orig_patch. Is there maybe a
-                // magic pattern that could avoid this?
-                let orig_patch = *orig_patch;
+
+        let mut deps_remaining: Vec<_> = deps.iter().collect();
+        let mut unlocked_summaries = Vec::new();
+        while !deps_remaining.is_empty() {
+            let mut deps_pending = Vec::new();
+            for dep_remaining in deps_remaining {
+                let (orig_patch, locked) = dep_remaining;
+
                 // Use the locked patch if it exists, otherwise use the original.
                 let dep = match locked {
                     Some(lock) => &lock.dependency,
-                    None => orig_patch,
+                    None => *orig_patch,
                 };
                 debug!(
                     "registering a patch for `{}` with `{}`",
@@ -314,37 +320,55 @@ impl<'cfg> PackageRegistry<'cfg> {
                     .sources
                     .get_mut(dep.source_id())
                     .expect("loaded source not present");
-                let summaries = source.query_vec(dep)?;
-                let (summary, should_unlock) = summary_for_patch(
-                    orig_patch, locked, summaries, source,
-                )
-                .with_context(|| {
-                    format!(
-                        "patch for `{}` in `{}` failed to resolve",
-                        orig_patch.package_name(),
-                        url,
-                    )
-                })?;
+
+                let summaries = match source.query_vec(dep)? {
+                    Poll::Ready(deps) => deps,
+                    Poll::Pending => {
+                        deps_pending.push(dep_remaining);
+                        continue;
+                    }
+                };
+
+                let (summary, should_unlock) =
+                    match summary_for_patch(orig_patch, &locked, summaries, source) {
+                        Poll::Ready(x) => x,
+                        Poll::Pending => {
+                            deps_pending.push(dep_remaining);
+                            continue;
+                        }
+                    }
+                    .with_context(|| {
+                        format!(
+                            "patch for `{}` in `{}` failed to resolve",
+                            orig_patch.package_name(),
+                            url,
+                        )
+                    })
+                    .with_context(|| format!("failed to resolve patches for `{}`", url))?;
+
                 debug!(
                     "patch summary is {:?} should_unlock={:?}",
                     summary, should_unlock
                 );
                 if let Some(unlock_id) = should_unlock {
-                    unlock_patches.push((orig_patch.clone(), unlock_id));
+                    unlock_patches.push(((*orig_patch).clone(), unlock_id));
                 }
 
                 if *summary.package_id().source_id().canonical_url() == canonical {
-                    anyhow::bail!(
+                    return Err(anyhow::anyhow!(
                         "patch for `{}` in `{}` points to the same source, but \
-                         patches must point to different sources",
+                        patches must point to different sources",
                         dep.package_name(),
                         url
-                    );
+                    ))
+                    .context(format!("failed to resolve patches for `{}`", url));
                 }
-                Ok(summary)
-            })
-            .collect::<CargoResult<Vec<_>>>()
-            .with_context(|| format!("failed to resolve patches for `{}`", url))?;
+                unlocked_summaries.push(summary);
+            }
+
+            deps_remaining = deps_pending;
+            self.block_until_ready()?;
+        }
 
         let mut name_and_version = HashSet::new();
         for summary in unlocked_summaries.iter() {
@@ -444,9 +468,13 @@ impl<'cfg> PackageRegistry<'cfg> {
         for &s in self.overrides.iter() {
             let src = self.sources.get_mut(s).unwrap();
             let dep = Dependency::new_override(dep.package_name(), s);
-            let mut results = src.query_vec(&dep)?;
-            if !results.is_empty() {
-                return Ok(Some(results.remove(0)));
+            match src.query_vec(&dep)? {
+                Poll::Ready(mut results) => {
+                    if !results.is_empty() {
+                        return Ok(Some(results.remove(0)));
+                    }
+                }
+                Poll::Pending => panic!("overrides have to be on path deps, how did we get here?"),
             }
         }
         Ok(None)
@@ -535,7 +563,7 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
         dep: &Dependency,
         f: &mut dyn FnMut(Summary),
         fuzzy: bool,
-    ) -> CargoResult<()> {
+    ) -> Poll<CargoResult<()>> {
         assert!(self.patches_locked);
         let (override_summary, n, to_warn) = {
             // Look for an override and get ready to query the real source.
@@ -569,7 +597,7 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
                     Some(summary) => (summary, 1, Some(patch)),
                     None => {
                         f(patch);
-                        return Ok(());
+                        return Poll::Ready(Ok(()));
                     }
                 }
             } else {
@@ -596,8 +624,10 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
 
                 let source = self.sources.get_mut(dep.source_id());
                 match (override_summary, source) {
-                    (Some(_), None) => anyhow::bail!("override found but no real ones"),
-                    (None, None) => return Ok(()),
+                    (Some(_), None) => {
+                        return Err(anyhow::anyhow!("override found but no real ones")).into()
+                    }
+                    (None, None) => return Poll::Ready(Ok(())),
 
                     // If we don't have an override then we just ship
                     // everything upstairs after locking the summary
@@ -636,7 +666,8 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
                     // the summaries it gives us though.
                     (Some(override_summary), Some(source)) => {
                         if !patches.is_empty() {
-                            anyhow::bail!("found patches and a path override")
+                            return Err(anyhow::anyhow!("found patches and a path override"))
+                                .into();
                         }
                         let mut n = 0;
                         let mut to_warn = None;
@@ -645,10 +676,13 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
                                 n += 1;
                                 to_warn = Some(summary);
                             };
-                            if fuzzy {
-                                source.fuzzy_query(dep, callback)?;
+                            let pend = if fuzzy {
+                                source.fuzzy_query(dep, callback)?
                             } else {
-                                source.query(dep, callback)?;
+                                source.query(dep, callback)?
+                            };
+                            if pend.is_pending() {
+                                return Poll::Pending;
                             }
                         }
                         (override_summary, n, to_warn)
@@ -658,12 +692,12 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
         };
 
         if n > 1 {
-            anyhow::bail!("found an override with a non-locked list");
+            return Err(anyhow::anyhow!("found an override with a non-locked list")).into();
         } else if let Some(summary) = to_warn {
             self.warn_bad_override(&override_summary, &summary)?;
         }
         f(self.lock(override_summary));
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn describe_source(&self, id: SourceId) -> String {
@@ -679,6 +713,13 @@ impl<'cfg> Registry for PackageRegistry<'cfg> {
             None => false,
         }
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        for (_, source) in self.sources.sources_mut() {
+            source.block_until_ready()?;
+        }
+        Ok(())
+    }
 }
 
 fn lock(
@@ -795,9 +836,9 @@ fn summary_for_patch(
     locked: &Option<LockedPatchDependency>,
     mut summaries: Vec<Summary>,
     source: &mut dyn Source,
-) -> CargoResult<(Summary, Option<PackageId>)> {
+) -> Poll<CargoResult<(Summary, Option<PackageId>)>> {
     if summaries.len() == 1 {
-        return Ok((summaries.pop().unwrap(), None));
+        return Poll::Ready(Ok((summaries.pop().unwrap(), None)));
     }
     if summaries.len() > 1 {
         // TODO: In the future, it might be nice to add all of these
@@ -810,7 +851,7 @@ fn summary_for_patch(
         let mut vers: Vec<_> = summaries.iter().map(|summary| summary.version()).collect();
         vers.sort();
         let versions: Vec<_> = vers.into_iter().map(|v| v.to_string()).collect();
-        anyhow::bail!(
+        return Err(anyhow::anyhow!(
             "patch for `{}` in `{}` resolved to more than one candidate\n\
             Found versions: {}\n\
             Update the patch definition to select only one package.\n\
@@ -820,13 +861,18 @@ fn summary_for_patch(
             orig_patch.source_id(),
             versions.join(", "),
             versions.last().unwrap()
-        );
+        ))
+        .into();
     }
     assert!(summaries.is_empty());
     // No summaries found, try to help the user figure out what is wrong.
     if let Some(locked) = locked {
         // Since the locked patch did not match anything, try the unlocked one.
-        let orig_matches = source.query_vec(orig_patch).unwrap_or_else(|e| {
+        let orig_matches = match source.query_vec(orig_patch) {
+            Poll::Pending => return Poll::Pending,
+            Poll::Ready(deps) => deps,
+        }
+        .unwrap_or_else(|e| {
             log::warn!(
                 "could not determine unlocked summaries for dep {:?}: {:?}",
                 orig_patch,
@@ -834,14 +880,24 @@ fn summary_for_patch(
             );
             Vec::new()
         });
-        let (summary, _) = summary_for_patch(orig_patch, &None, orig_matches, source)?;
+
+        let summary = match summary_for_patch(orig_patch, &None, orig_matches, source) {
+            Poll::Pending => return Poll::Pending,
+            Poll::Ready(summary) => summary?,
+        };
+
         // The unlocked version found a match. This returns a value to
         // indicate that this entry should be unlocked.
-        return Ok((summary, Some(locked.package_id)));
+        return Poll::Ready(Ok((summary.0, Some(locked.package_id))));
     }
     // Try checking if there are *any* packages that match this by name.
     let name_only_dep = Dependency::new_override(orig_patch.package_name(), orig_patch.source_id());
-    let name_summaries = source.query_vec(&name_only_dep).unwrap_or_else(|e| {
+
+    let name_summaries = match source.query_vec(&name_only_dep) {
+        Poll::Pending => return Poll::Pending,
+        Poll::Ready(deps) => deps,
+    }
+    .unwrap_or_else(|e| {
         log::warn!(
             "failed to do name-only summary query for {:?}: {:?}",
             name_only_dep,
@@ -862,15 +918,15 @@ fn summary_for_patch(
             format!("versions `{}`", strs.join(", "))
         }
     };
-    if found.is_empty() {
-        anyhow::bail!(
+    Err(if found.is_empty() {
+        anyhow::anyhow!(
             "The patch location `{}` does not appear to contain any packages \
             matching the name `{}`.",
             orig_patch.source_id(),
             orig_patch.package_name()
-        );
+        )
     } else {
-        anyhow::bail!(
+        anyhow::anyhow!(
             "The patch location `{}` contains a `{}` package with {}, but the patch \
             definition requires `{}`.\n\
             Check that the version in the patch location is what you expect, \
@@ -879,6 +935,7 @@ fn summary_for_patch(
             orig_patch.package_name(),
             found,
             orig_patch.version_req()
-        );
-    }
+        )
+    })
+    .into()
 }
index d3ad787d589abe8d6eda7ecec938deb97c9ee9a2..50e91b34b44dafbb275c781cce744b7727f0957b 100644 (file)
@@ -24,6 +24,7 @@ use anyhow::Context as _;
 use log::debug;
 use std::collections::{BTreeSet, HashMap, HashSet};
 use std::rc::Rc;
+use std::task::Poll;
 
 pub struct RegistryQueryer<'a> {
     pub registry: &'a mut (dyn Registry + 'a),
@@ -34,11 +35,11 @@ pub struct RegistryQueryer<'a> {
     /// specify minimum dependency versions to be used.
     minimal_versions: bool,
     /// a cache of `Candidate`s that fulfil a `Dependency`
-    registry_cache: HashMap<Dependency, Rc<Vec<Summary>>>,
+    registry_cache: HashMap<Dependency, Poll<Rc<Vec<Summary>>>>,
     /// a cache of `Dependency`s that are required for a `Summary`
     summary_cache: HashMap<
         (Option<PackageId>, Summary, ResolveOpts),
-        Rc<(HashSet<InternedString>, Rc<Vec<DepInfo>>)>,
+        (Rc<(HashSet<InternedString>, Rc<Vec<DepInfo>>)>, bool),
     >,
     /// all the cases we ended up using a supplied replacement
     used_replacements: HashMap<PackageId, Summary>,
@@ -62,6 +63,23 @@ impl<'a> RegistryQueryer<'a> {
         }
     }
 
+    pub fn reset_pending(&mut self) -> bool {
+        let mut all_ready = true;
+        self.registry_cache.retain(|_, r| {
+            if !r.is_ready() {
+                all_ready = false;
+            }
+            r.is_ready()
+        });
+        self.summary_cache.retain(|_, (_, r)| {
+            if !*r {
+                all_ready = false;
+            }
+            *r
+        });
+        all_ready
+    }
+
     pub fn used_replacement_for(&self, p: PackageId) -> Option<(PackageId, PackageId)> {
         self.used_replacements.get(&p).map(|r| (p, r.package_id()))
     }
@@ -76,19 +94,23 @@ impl<'a> RegistryQueryer<'a> {
     /// any candidates are returned which match an override then the override is
     /// applied by performing a second query for what the override should
     /// return.
-    pub fn query(&mut self, dep: &Dependency) -> CargoResult<Rc<Vec<Summary>>> {
+    pub fn query(&mut self, dep: &Dependency) -> Poll<CargoResult<Rc<Vec<Summary>>>> {
         if let Some(out) = self.registry_cache.get(dep).cloned() {
-            return Ok(out);
+            return out.map(Result::Ok);
         }
 
         let mut ret = Vec::new();
-        self.registry.query(
+        let ready = self.registry.query(
             dep,
             &mut |s| {
                 ret.push(s);
             },
             false,
         )?;
+        if ready.is_pending() {
+            self.registry_cache.insert(dep.clone(), Poll::Pending);
+            return Poll::Pending;
+        }
         for summary in ret.iter_mut() {
             let mut potential_matches = self
                 .replacements
@@ -105,7 +127,13 @@ impl<'a> RegistryQueryer<'a> {
                 dep.version_req()
             );
 
-            let mut summaries = self.registry.query_vec(dep, false)?.into_iter();
+            let mut summaries = match self.registry.query_vec(dep, false)? {
+                Poll::Ready(s) => s.into_iter(),
+                Poll::Pending => {
+                    self.registry_cache.insert(dep.clone(), Poll::Pending);
+                    return Poll::Pending;
+                }
+            };
             let s = summaries.next().ok_or_else(|| {
                 anyhow::format_err!(
                     "no matching package for override `{}` found\n\
@@ -122,13 +150,14 @@ impl<'a> RegistryQueryer<'a> {
                     .iter()
                     .map(|s| format!("  * {}", s.package_id()))
                     .collect::<Vec<_>>();
-                anyhow::bail!(
+                return Err(anyhow::anyhow!(
                     "the replacement specification `{}` matched \
                      multiple packages:\n  * {}\n{}",
                     spec,
                     s.package_id(),
                     bullets.join("\n")
-                );
+                ))
+                .into();
             }
 
             // The dependency should be hard-coded to have the same name and an
@@ -147,13 +176,14 @@ impl<'a> RegistryQueryer<'a> {
 
             // Make sure no duplicates
             if let Some(&(ref spec, _)) = potential_matches.next() {
-                anyhow::bail!(
+                return Err(anyhow::anyhow!(
                     "overlapping replacement specifications found:\n\n  \
                      * {}\n  * {}\n\nboth specifications match: {}",
                     matched_spec,
                     spec,
                     summary.package_id()
-                );
+                ))
+                .into();
             }
 
             for dep in summary.dependencies() {
@@ -175,11 +205,11 @@ impl<'a> RegistryQueryer<'a> {
             },
         );
 
-        let out = Rc::new(ret);
+        let out = Poll::Ready(Rc::new(ret));
 
         self.registry_cache.insert(dep.clone(), out.clone());
 
-        Ok(out)
+        out.map(Result::Ok)
     }
 
     /// Find out what dependencies will be added by activating `candidate`,
@@ -198,9 +228,8 @@ impl<'a> RegistryQueryer<'a> {
         if let Some(out) = self
             .summary_cache
             .get(&(parent, candidate.clone(), opts.clone()))
-            .cloned()
         {
-            return Ok(out);
+            return Ok(out.0.clone());
         }
         // First, figure out our set of dependencies based on the requested set
         // of features. This also calculates what features we're going to enable
@@ -209,17 +238,22 @@ impl<'a> RegistryQueryer<'a> {
 
         // Next, transform all dependencies into a list of possible candidates
         // which can satisfy that dependency.
+        let mut all_ready = true;
         let mut deps = deps
             .into_iter()
-            .map(|(dep, features)| {
-                let candidates = self.query(&dep).with_context(|| {
+            .filter_map(|(dep, features)| match self.query(&dep) {
+                Poll::Ready(Ok(candidates)) => Some(Ok((dep, candidates, features))),
+                Poll::Pending => {
+                    all_ready = false;
+                    None // we can ignore Pending deps, resolve will be repeatedly called until there are none to ignore
+                }
+                Poll::Ready(Err(e)) => Some(Err(e).with_context(|| {
                     format!(
                         "failed to get `{}` as a dependency of {}",
                         dep.package_name(),
                         describe_path_in_context(cx, &candidate.package_id()),
                     )
-                })?;
-                Ok((dep, candidates, features))
+                })),
             })
             .collect::<CargoResult<Vec<DepInfo>>>()?;
 
@@ -233,8 +267,10 @@ impl<'a> RegistryQueryer<'a> {
 
         // If we succeed we add the result to the cache so we can use it again next time.
         // We don't cache the failure cases as they don't impl Clone.
-        self.summary_cache
-            .insert((parent, candidate.clone(), opts.clone()), out.clone());
+        self.summary_cache.insert(
+            (parent, candidate.clone(), opts.clone()),
+            (out.clone(), all_ready),
+        );
 
         Ok(out)
     }
index 5cabd01bae4fc25745dd2404da332ff0d9bac783..70e4593406eb3d425d40ba97b90008508213e213 100644 (file)
@@ -1,4 +1,5 @@
 use std::fmt;
+use std::task::Poll;
 
 use crate::core::{Dependency, PackageId, Registry, Summary};
 use crate::util::lev_distance::lev_distance;
@@ -217,120 +218,128 @@ pub(super) fn activation_error(
     let all_req = semver::VersionReq::parse("*").unwrap();
     let mut new_dep = dep.clone();
     new_dep.set_version_req(all_req);
-    let mut candidates = match registry.query_vec(&new_dep, false) {
-        Ok(candidates) => candidates,
-        Err(e) => return to_resolve_err(e),
+
+    let mut candidates = Vec::new();
+    // TODO: we can ignore the `Pending` case because we are just in an error reporting path,
+    // and we have probably already triggered the query anyway. But, if we start getting reports
+    // of confusing errors that go away when called again this is a place to look.
+    if let Poll::Ready(Err(e)) = registry.query(&new_dep, &mut |s| candidates.push(s), false) {
+        return to_resolve_err(e);
     };
     candidates.sort_unstable_by(|a, b| b.version().cmp(a.version()));
 
-    let mut msg =
-        if !candidates.is_empty() {
-            let versions = {
-                let mut versions = candidates
-                    .iter()
-                    .take(3)
-                    .map(|cand| cand.version().to_string())
-                    .collect::<Vec<_>>();
+    let mut msg = if !candidates.is_empty() {
+        let versions = {
+            let mut versions = candidates
+                .iter()
+                .take(3)
+                .map(|cand| cand.version().to_string())
+                .collect::<Vec<_>>();
 
-                if candidates.len() > 3 {
-                    versions.push("...".into());
-                }
+            if candidates.len() > 3 {
+                versions.push("...".into());
+            }
 
-                versions.join(", ")
-            };
+            versions.join(", ")
+        };
 
-            let mut msg = format!(
-                "failed to select a version for the requirement `{} = \"{}\"`\n\
+        let mut msg = format!(
+            "failed to select a version for the requirement `{} = \"{}\"`\n\
                  candidate versions found which didn't match: {}\n\
                  location searched: {}\n",
-                dep.package_name(),
-                dep.version_req(),
-                versions,
-                registry.describe_source(dep.source_id()),
-            );
-            msg.push_str("required by ");
-            msg.push_str(&describe_path_in_context(cx, &parent.package_id()));
+            dep.package_name(),
+            dep.version_req(),
+            versions,
+            registry.describe_source(dep.source_id()),
+        );
+        msg.push_str("required by ");
+        msg.push_str(&describe_path_in_context(cx, &parent.package_id()));
 
-            // If we have a path dependency with a locked version, then this may
-            // indicate that we updated a sub-package and forgot to run `cargo
-            // update`. In this case try to print a helpful error!
-            if dep.source_id().is_path() && dep.version_req().to_string().starts_with('=') {
-                msg.push_str(
-                    "\nconsider running `cargo update` to update \
+        // If we have a path dependency with a locked version, then this may
+        // indicate that we updated a sub-package and forgot to run `cargo
+        // update`. In this case try to print a helpful error!
+        if dep.source_id().is_path() && dep.version_req().to_string().starts_with('=') {
+            msg.push_str(
+                "\nconsider running `cargo update` to update \
                      a path dependency's locked version",
-                );
-            }
+            );
+        }
 
-            if registry.is_replaced(dep.source_id()) {
-                msg.push_str("\nperhaps a crate was updated and forgotten to be re-vendored?");
-            }
+        if registry.is_replaced(dep.source_id()) {
+            msg.push_str("\nperhaps a crate was updated and forgotten to be re-vendored?");
+        }
 
-            msg
+        msg
+    } else {
+        // Maybe the user mistyped the name? Like `dep-thing` when `Dep_Thing`
+        // was meant. So we try asking the registry for a `fuzzy` search for suggestions.
+        let mut candidates = Vec::new();
+        if let Poll::Ready(Err(e)) = registry.query(&new_dep, &mut |s| candidates.push(s), true) {
+            // TODO: we can ignore the `Pending` case because we are just in an error reporting path,
+            // and we have probably already triggered the query anyway. But, if we start getting reports
+            // of confusing errors that go away when called again this is a place to look.
+            return to_resolve_err(e);
+        };
+        candidates.sort_unstable_by_key(|a| a.name());
+        candidates.dedup_by(|a, b| a.name() == b.name());
+        let mut candidates: Vec<_> = candidates
+            .iter()
+            .map(|n| (lev_distance(&*new_dep.package_name(), &*n.name()), n))
+            .filter(|&(d, _)| d < 4)
+            .collect();
+        candidates.sort_by_key(|o| o.0);
+        let mut msg: String;
+        if candidates.is_empty() {
+            msg = format!("no matching package named `{}` found\n", dep.package_name());
         } else {
-            // Maybe the user mistyped the name? Like `dep-thing` when `Dep_Thing`
-            // was meant. So we try asking the registry for a `fuzzy` search for suggestions.
-            let mut candidates = Vec::new();
-            if let Err(e) = registry.query(&new_dep, &mut |s| candidates.push(s), true) {
-                return to_resolve_err(e);
-            };
-            candidates.sort_unstable_by_key(|a| a.name());
-            candidates.dedup_by(|a, b| a.name() == b.name());
-            let mut candidates: Vec<_> = candidates
-                .iter()
-                .map(|n| (lev_distance(&*new_dep.package_name(), &*n.name()), n))
-                .filter(|&(d, _)| d < 4)
-                .collect();
-            candidates.sort_by_key(|o| o.0);
-            let mut msg: String;
-            if candidates.is_empty() {
-                msg = format!("no matching package named `{}` found\n", dep.package_name());
+            msg = format!(
+                "no matching package found\nsearched package name: `{}`\n",
+                dep.package_name()
+            );
+
+            // If dependency package name is equal to the name of the candidate here
+            // it may be a prerelease package which hasn't been specified correctly
+            if dep.package_name() == candidates[0].1.name()
+                && candidates[0].1.package_id().version().is_prerelease()
+            {
+                msg.push_str("prerelease package needs to be specified explicitly\n");
+                msg.push_str(&format!(
+                    "{name} = {{ version = \"{version}\" }}",
+                    name = candidates[0].1.name(),
+                    version = candidates[0].1.package_id().version()
+                ));
             } else {
-                msg = format!(
-                    "no matching package found\nsearched package name: `{}`\n",
-                    dep.package_name()
-                );
+                let mut names = candidates
+                    .iter()
+                    .take(3)
+                    .map(|c| c.1.name().as_str())
+                    .collect::<Vec<_>>();
 
-                // If dependency package name is equal to the name of the candidate here
-                // it may be a prerelease package which hasn't been specified correctly
-                if dep.package_name() == candidates[0].1.name()
-                    && candidates[0].1.package_id().version().is_prerelease()
-                {
-                    msg.push_str("prerelease package needs to be specified explicitly\n");
-                    msg.push_str(&format!(
-                        "{name} = {{ version = \"{version}\" }}",
-                        name = candidates[0].1.name(),
-                        version = candidates[0].1.package_id().version()
-                    ));
-                } else {
-                    let mut names = candidates
+                if candidates.len() > 3 {
+                    names.push("...");
+                }
+                // Vertically align first suggestion with missing crate name
+                // so a typo jumps out at you.
+                msg.push_str("perhaps you meant:      ");
+                msg.push_str(
+                    &names
                         .iter()
-                        .take(3)
-                        .map(|c| c.1.name().as_str())
-                        .collect::<Vec<_>>();
-
-                    if candidates.len() > 3 {
-                        names.push("...");
-                    }
-                    // Vertically align first suggestion with missing crate name
-                    // so a typo jumps out at you.
-                    msg.push_str("perhaps you meant:      ");
-                    msg.push_str(&names.iter().enumerate().fold(
-                        String::default(),
-                        |acc, (i, el)| match i {
+                        .enumerate()
+                        .fold(String::default(), |acc, (i, el)| match i {
                             0 => acc + el,
                             i if names.len() - 1 == i && candidates.len() <= 3 => acc + " or " + el,
                             _ => acc + ", " + el,
-                        },
-                    ));
-                }
-                msg.push('\n');
+                        }),
+                );
             }
-            msg.push_str(&format!("location searched: {}\n", dep.source_id()));
-            msg.push_str("required by ");
-            msg.push_str(&describe_path_in_context(cx, &parent.package_id()));
+            msg.push('\n');
+        }
+        msg.push_str(&format!("location searched: {}\n", dep.source_id()));
+        msg.push_str("required by ");
+        msg.push_str(&describe_path_in_context(cx, &parent.package_id()));
 
-            msg
-        };
+        msg
+    };
 
     if let Some(config) = config {
         if config.offline() {
index 28b328132d287f3feba8eed030d54b29634531d6..000ded24652c117214714f4ac82fd0fa081e6d51 100644 (file)
@@ -58,6 +58,7 @@ use crate::core::PackageIdSpec;
 use crate::core::{Dependency, PackageId, Registry, Summary};
 use crate::util::config::Config;
 use crate::util::errors::CargoResult;
+use crate::util::network::PollExt;
 use crate::util::profile;
 
 use self::context::Context;
@@ -127,7 +128,6 @@ pub fn resolve(
     config: Option<&Config>,
     check_public_visible_dependencies: bool,
 ) -> CargoResult<Resolve> {
-    let cx = Context::new(check_public_visible_dependencies);
     let _p = profile::start("resolving");
     let minimal_versions = match config {
         Some(config) => config.cli_unstable().minimal_versions,
@@ -135,7 +135,15 @@ pub fn resolve(
     };
     let mut registry =
         RegistryQueryer::new(registry, replacements, version_prefs, minimal_versions);
-    let cx = activate_deps_loop(cx, &mut registry, summaries, config)?;
+    let cx = loop {
+        let cx = Context::new(check_public_visible_dependencies);
+        let cx = activate_deps_loop(cx, &mut registry, summaries, config)?;
+        if registry.reset_pending() {
+            break cx;
+        } else {
+            registry.registry.block_until_ready()?;
+        }
+    };
 
     let mut cksums = HashMap::new();
     for (summary, _) in cx.activations.values() {
@@ -854,6 +862,7 @@ fn generalize_conflicting(
             if let Some(others) = registry
                 .query(critical_parents_dep)
                 .expect("an already used dep now error!?")
+                .expect("an already used dep now pending!?")
                 .iter()
                 .rev() // the last one to be tried is the least likely to be in the cache, so start with that.
                 .map(|other| {
index 81009ea6529a4497ea0465c501c45e25c8feffdd..1319d5634397b1b9fc88439ba5711af976d6991d 100644 (file)
@@ -1,5 +1,6 @@
 use std::collections::hash_map::HashMap;
 use std::fmt;
+use std::task::Poll;
 
 use crate::core::package::PackageSet;
 use crate::core::{Dependency, Package, PackageId, Summary};
@@ -28,18 +29,21 @@ pub trait Source {
     fn requires_precise(&self) -> bool;
 
     /// Attempts to find the packages that match a dependency request.
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()>;
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>>;
 
     /// Attempts to find the packages that are close to a dependency request.
     /// Each source gets to define what `close` means for it.
     /// Path/Git sources may return all dependencies that are at that URI,
     /// whereas an `Index` source may return dependencies that have the same canonicalization.
-    fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()>;
+    fn fuzzy_query(
+        &mut self,
+        dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>>;
 
-    fn query_vec(&mut self, dep: &Dependency) -> CargoResult<Vec<Summary>> {
+    fn query_vec(&mut self, dep: &Dependency) -> Poll<CargoResult<Vec<Summary>>> {
         let mut ret = Vec::new();
-        self.query(dep, &mut |s| ret.push(s))?;
-        Ok(ret)
+        self.query(dep, &mut |s| ret.push(s)).map_ok(|_| ret)
     }
 
     /// Performs any network operations required to get the entire list of all names,
@@ -101,6 +105,9 @@ pub trait Source {
     /// Query if a package is yanked. Only registry sources can mark packages
     /// as yanked. This ignores the yanked whitelist.
     fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult<bool>;
+
+    /// Block until all outstanding Poll::Pending requests are Poll::Ready.
+    fn block_until_ready(&mut self) -> CargoResult<()>;
 }
 
 pub enum MaybePackage {
@@ -130,12 +137,16 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box<T> {
     }
 
     /// Forwards to `Source::query`.
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         (**self).query(dep, f)
     }
 
     /// Forwards to `Source::query`.
-    fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         (**self).fuzzy_query(dep, f)
     }
 
@@ -178,6 +189,10 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box<T> {
     fn is_yanked(&mut self, pkg: PackageId) -> CargoResult<bool> {
         (**self).is_yanked(pkg)
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        (**self).block_until_ready()
+    }
 }
 
 impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T {
@@ -197,11 +212,15 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T {
         (**self).requires_precise()
     }
 
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         (**self).query(dep, f)
     }
 
-    fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         (**self).fuzzy_query(dep, f)
     }
 
@@ -240,6 +259,10 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T {
     fn is_yanked(&mut self, pkg: PackageId) -> CargoResult<bool> {
         (**self).is_yanked(pkg)
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        (**self).block_until_ready()
+    }
 }
 
 /// A `HashMap` of `SourceId` -> `Box<Source>`.
index 834137715e16a6255042f731a7497e0b3744dcd7..cc872fd0a5be6b9483396f2c8531f5479a173d06 100644 (file)
@@ -4,6 +4,7 @@ use std::io::prelude::*;
 use std::io::SeekFrom;
 use std::path::{Path, PathBuf};
 use std::rc::Rc;
+use std::task::Poll;
 
 use anyhow::{bail, format_err, Context as _};
 use serde::{Deserialize, Serialize};
@@ -538,7 +539,12 @@ where
         source.update()?;
     }
 
-    let deps = source.query_vec(&dep)?;
+    let deps = loop {
+        match source.query_vec(&dep)? {
+            Poll::Ready(deps) => break deps,
+            Poll::Pending => source.block_until_ready()?,
+        }
+    };
     match deps.iter().map(|p| p.package_id()).max() {
         Some(pkgid) => {
             let pkg = Box::new(source).download_now(pkgid, config)?;
index 7a00b560f87d59bfefc17b58bf9ecc59da1ac576..f3e15dc05246a313ad15322a528f9f70ee2a60f2 100644 (file)
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::fmt::{self, Debug, Formatter};
 use std::path::{Path, PathBuf};
+use std::task::Poll;
 
 use crate::core::source::MaybePackage;
 use crate::core::{Dependency, Package, PackageId, Source, SourceId, Summary};
@@ -43,21 +44,25 @@ impl<'cfg> Debug for DirectorySource<'cfg> {
 }
 
 impl<'cfg> Source for DirectorySource<'cfg> {
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         let packages = self.packages.values().map(|p| &p.0);
         let matches = packages.filter(|pkg| dep.matches(pkg.summary()));
         for summary in matches.map(|pkg| pkg.summary().clone()) {
             f(summary);
         }
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
-    fn fuzzy_query(&mut self, _dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        _dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         let packages = self.packages.values().map(|p| &p.0);
         for summary in packages.map(|pkg| pkg.summary().clone()) {
             f(summary);
         }
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn supports_checksums(&self) -> bool {
@@ -205,4 +210,8 @@ impl<'cfg> Source for DirectorySource<'cfg> {
     fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult<bool> {
         Ok(false)
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        Ok(())
+    }
 }
index 9d7c42b829f1f40e947cab6f4c89ee480c8a50bd..27da1533744e8fb1f6445635dcc43f07d513dc50 100644 (file)
@@ -9,6 +9,7 @@ use crate::util::Config;
 use anyhow::Context;
 use log::trace;
 use std::fmt::{self, Debug, Formatter};
+use std::task::Poll;
 use url::Url;
 
 pub struct GitSource<'cfg> {
@@ -83,7 +84,7 @@ impl<'cfg> Debug for GitSource<'cfg> {
 }
 
 impl<'cfg> Source for GitSource<'cfg> {
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         let src = self
             .path_source
             .as_mut()
@@ -91,7 +92,11 @@ impl<'cfg> Source for GitSource<'cfg> {
         src.query(dep, f)
     }
 
-    fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         let src = self
             .path_source
             .as_mut()
@@ -212,6 +217,10 @@ impl<'cfg> Source for GitSource<'cfg> {
     fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult<bool> {
         Ok(false)
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        Ok(())
+    }
 }
 
 #[cfg(test)]
index cc1c9874179b64af614ee398e2a17a461fb9f5e3..d1d461f7d730c8f979baf16e150fe280fc057180 100644 (file)
@@ -1,6 +1,7 @@
 use std::collections::HashSet;
 use std::fmt::{self, Debug, Formatter};
 use std::path::{Path, PathBuf};
+use std::task::Poll;
 
 use crate::core::source::MaybePackage;
 use crate::core::{Dependency, Package, PackageId, Source, SourceId, Summary};
@@ -486,20 +487,24 @@ impl<'cfg> Debug for PathSource<'cfg> {
 }
 
 impl<'cfg> Source for PathSource<'cfg> {
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         for s in self.packages.iter().map(|p| p.summary()) {
             if dep.matches(s) {
                 f(s.clone())
             }
         }
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
-    fn fuzzy_query(&mut self, _dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        _dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         for s in self.packages.iter().map(|p| p.summary()) {
             f(s.clone())
         }
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn supports_checksums(&self) -> bool {
@@ -558,4 +563,8 @@ impl<'cfg> Source for PathSource<'cfg> {
     fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult<bool> {
         Ok(false)
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        Ok(())
+    }
 }
index 1394d9c1ab64eebdfbd5c7fc926ba8090b4bcaff..27fbbb50f6dfee4861ab3a2a9537e36d297145c7 100644 (file)
@@ -80,6 +80,7 @@ use std::convert::TryInto;
 use std::fs;
 use std::path::Path;
 use std::str;
+use std::task::Poll;
 
 /// Crates.io treats hyphen and underscores as interchangeable, but the index and old Cargo do not.
 /// Therefore, the index must store uncanonicalized version of the name so old Cargo's can find it.
@@ -263,16 +264,18 @@ impl<'cfg> RegistryIndex<'cfg> {
     }
 
     /// Returns the hash listed for a specified `PackageId`.
-    pub fn hash(&mut self, pkg: PackageId, load: &mut dyn RegistryData) -> CargoResult<&str> {
+    pub fn hash(&mut self, pkg: PackageId, load: &mut dyn RegistryData) -> Poll<CargoResult<&str>> {
         let req = OptVersionReq::exact(pkg.version());
-        let summary = self
-            .summaries(pkg.name(), &req, load)?
-            .next()
-            .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))?;
-        summary
+        let summary = self.summaries(pkg.name(), &req, load)?;
+        let summary = match summary {
+            Poll::Ready(mut summary) => summary.next(),
+            Poll::Pending => return Poll::Pending,
+        };
+        Poll::Ready(Ok(summary
+            .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))?
             .summary
             .checksum()
-            .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))
+            .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))?))
     }
 
     /// Load a list of summaries for `name` package in this registry which
@@ -287,7 +290,7 @@ impl<'cfg> RegistryIndex<'cfg> {
         name: InternedString,
         req: &'b OptVersionReq,
         load: &mut dyn RegistryData,
-    ) -> CargoResult<impl Iterator<Item = &'a IndexSummary> + 'b>
+    ) -> Poll<CargoResult<impl Iterator<Item = &'a IndexSummary> + 'b>>
     where
         'a: 'b,
     {
@@ -298,7 +301,10 @@ impl<'cfg> RegistryIndex<'cfg> {
         // has run previously this will parse a Cargo-specific cache file rather
         // than the registry itself. In effect this is intended to be a quite
         // cheap operation.
-        let summaries = self.load_summaries(name, load)?;
+        let summaries = match self.load_summaries(name, load)? {
+            Poll::Ready(x) => x,
+            Poll::Pending => return Poll::Pending,
+        };
 
         // Iterate over our summaries, extract all relevant ones which match our
         // version requirement, and then parse all corresponding rows in the
@@ -307,7 +313,7 @@ impl<'cfg> RegistryIndex<'cfg> {
         // minimize the amount of work being done here and parse as little as
         // necessary.
         let raw_data = &summaries.raw_data;
-        Ok(summaries
+        Poll::Ready(Ok(summaries
             .versions
             .iter_mut()
             .filter_map(move |(k, v)| if req.matches(k) { Some(v) } else { None })
@@ -332,18 +338,18 @@ impl<'cfg> RegistryIndex<'cfg> {
                 } else {
                     true
                 }
-            }))
+            })))
     }
 
     fn load_summaries(
         &mut self,
         name: InternedString,
         load: &mut dyn RegistryData,
-    ) -> CargoResult<&mut Summaries> {
+    ) -> Poll<CargoResult<&mut Summaries>> {
         // If we've previously loaded what versions are present for `name`, just
         // return that since our cache should still be valid.
         if self.summaries_cache.contains_key(&name) {
-            return Ok(self.summaries_cache.get_mut(&name).unwrap());
+            return Poll::Ready(Ok(self.summaries_cache.get_mut(&name).unwrap()));
         }
 
         // Prepare the `RegistryData` which will lazily initialize internal data
@@ -363,12 +369,14 @@ impl<'cfg> RegistryIndex<'cfg> {
             .collect::<String>();
         let raw_path = make_dep_path(&fs_name, false);
 
+        let mut any_pending = false;
+
         // Attempt to handle misspellings by searching for a chain of related
         // names to the original `raw_path` name. Only return summaries
         // associated with the first hit, however. The resolver will later
         // reject any candidates that have the wrong name, and with this it'll
         // along the way produce helpful "did you mean?" suggestions.
-        for path in UncanonicalizedIter::new(&raw_path).take(1024) {
+        for (i, path) in UncanonicalizedIter::new(&raw_path).take(1024).enumerate() {
             let summaries = Summaries::parse(
                 index_version.as_deref(),
                 root,
@@ -378,16 +386,30 @@ impl<'cfg> RegistryIndex<'cfg> {
                 load,
                 self.config,
             )?;
-            if let Some(summaries) = summaries {
+            if summaries.is_pending() {
+                if i == 0 {
+                    // If we have not herd back about the name as requested
+                    // then don't ask about other spellings yet.
+                    // This prevents us spamming all the variations in the
+                    // case where we have the correct spelling.
+                    return Poll::Pending;
+                }
+                any_pending = true;
+            }
+            if let Poll::Ready(Some(summaries)) = summaries {
                 self.summaries_cache.insert(name, summaries);
-                return Ok(self.summaries_cache.get_mut(&name).unwrap());
+                return Poll::Ready(Ok(self.summaries_cache.get_mut(&name).unwrap()));
             }
         }
 
+        if any_pending {
+            return Poll::Pending;
+        }
+
         // If nothing was found then this crate doesn't exists, so just use an
         // empty `Summaries` list.
         self.summaries_cache.insert(name, Summaries::default());
-        Ok(self.summaries_cache.get_mut(&name).unwrap())
+        Poll::Ready(Ok(self.summaries_cache.get_mut(&name).unwrap()))
     }
 
     pub fn query_inner(
@@ -396,11 +418,12 @@ impl<'cfg> RegistryIndex<'cfg> {
         load: &mut dyn RegistryData,
         yanked_whitelist: &HashSet<PackageId>,
         f: &mut dyn FnMut(Summary),
-    ) -> CargoResult<()> {
+    ) -> Poll<CargoResult<()>> {
         if self.config.offline()
-            && self.query_inner_with_online(dep, load, yanked_whitelist, f, false)? != 0
+            && self.query_inner_with_online(dep, load, yanked_whitelist, f, false)?
+                != Poll::Ready(0)
         {
-            return Ok(());
+            return Poll::Ready(Ok(()));
             // If offline, and there are no matches, try again with online.
             // This is necessary for dependencies that are not used (such as
             // target-cfg or optional), but are not downloaded. Normally the
@@ -410,8 +433,8 @@ impl<'cfg> RegistryIndex<'cfg> {
             // indicating that the required dependency is unavailable while
             // offline will be displayed.
         }
-        self.query_inner_with_online(dep, load, yanked_whitelist, f, true)?;
-        Ok(())
+        self.query_inner_with_online(dep, load, yanked_whitelist, f, true)
+            .map_ok(|_| ())
     }
 
     fn query_inner_with_online(
@@ -421,10 +444,15 @@ impl<'cfg> RegistryIndex<'cfg> {
         yanked_whitelist: &HashSet<PackageId>,
         f: &mut dyn FnMut(Summary),
         online: bool,
-    ) -> CargoResult<usize> {
+    ) -> Poll<CargoResult<usize>> {
         let source_id = self.source_id;
-        let summaries = self
-            .summaries(dep.package_name(), dep.version_req(), load)?
+
+        let summaries = match self.summaries(dep.package_name(), dep.version_req(), load)? {
+            Poll::Ready(x) => x,
+            Poll::Pending => return Poll::Pending,
+        };
+
+        let summaries = summaries
             // First filter summaries for `--offline`. If we're online then
             // everything is a candidate, otherwise if we're offline we're only
             // going to consider candidates which are actually present on disk.
@@ -489,15 +517,19 @@ impl<'cfg> RegistryIndex<'cfg> {
             f(summary);
             count += 1;
         }
-        Ok(count)
+        Poll::Ready(Ok(count))
     }
 
-    pub fn is_yanked(&mut self, pkg: PackageId, load: &mut dyn RegistryData) -> CargoResult<bool> {
+    pub fn is_yanked(
+        &mut self,
+        pkg: PackageId,
+        load: &mut dyn RegistryData,
+    ) -> Poll<CargoResult<bool>> {
         let req = OptVersionReq::exact(pkg.version());
         let found = self
-            .summaries(pkg.name(), &req, load)?
-            .any(|summary| summary.yanked);
-        Ok(found)
+            .summaries(pkg.name(), &req, load)
+            .map_ok(|mut p| p.any(|summary| summary.yanked));
+        found
     }
 }
 
@@ -531,7 +563,7 @@ impl Summaries {
         source_id: SourceId,
         load: &mut dyn RegistryData,
         config: &Config,
-    ) -> CargoResult<Option<Summaries>> {
+    ) -> Poll<CargoResult<Option<Summaries>>> {
         // First up, attempt to load the cache. This could fail for all manner
         // of reasons, but consider all of them non-fatal and just log their
         // occurrence in case anyone is debugging anything.
@@ -545,7 +577,7 @@ impl Summaries {
                         if cfg!(debug_assertions) {
                             cache_contents = Some(s.raw_data);
                         } else {
-                            return Ok(Some(s));
+                            return Poll::Ready(Ok(Some(s)));
                         }
                     }
                     Err(e) => {
@@ -598,14 +630,19 @@ impl Summaries {
             Ok(())
         });
 
+        if matches!(err, Poll::Pending) {
+            assert!(!hit_closure);
+            return Poll::Pending;
+        }
+
         // We ignore lookup failures as those are just crates which don't exist
         // or we haven't updated the registry yet. If we actually ran the
         // closure though then we care about those errors.
         if !hit_closure {
             debug_assert!(cache_contents.is_none());
-            return Ok(None);
+            return Poll::Ready(Ok(None));
         }
-        err?;
+        let _ = err?;
 
         // If we've got debug assertions enabled and the cache was previously
         // present and considered fresh this is where the debug assertions
@@ -636,7 +673,7 @@ impl Summaries {
             }
         }
 
-        Ok(Some(ret))
+        Poll::Ready(Ok(Some(ret)))
     }
 
     /// Parses an open `File` which represents information previously cached by
index cccc553ee9ed18b7bca17bede4bafab92c170b47..cfde8a51a1c446b2dd4d594db59bdfc427b3f2e9 100644 (file)
@@ -8,6 +8,7 @@ use std::fs::File;
 use std::io::prelude::*;
 use std::io::SeekFrom;
 use std::path::Path;
+use std::task::Poll;
 
 /// A local registry is a registry that lives on the filesystem as a set of
 /// `.crate` files with an `index` directory in the same format as a remote
@@ -54,8 +55,8 @@ impl<'cfg> RegistryData for LocalRegistry<'cfg> {
         root: &Path,
         path: &Path,
         data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
-    ) -> CargoResult<()> {
-        data(&paths::read_bytes(&root.join(path))?)
+    ) -> Poll<CargoResult<()>> {
+        Poll::Ready(Ok(data(&paths::read_bytes(&root.join(path))?)?))
     }
 
     fn config(&mut self) -> CargoResult<Option<RegistryConfig>> {
@@ -120,4 +121,8 @@ impl<'cfg> RegistryData for LocalRegistry<'cfg> {
     ) -> CargoResult<File> {
         panic!("this source doesn't download")
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        Ok(())
+    }
 }
index d9df11bbfd27c58e3882e25429155019cb049070..913da02ba88587f9c59137d1d41aac207752f198 100644 (file)
@@ -164,6 +164,7 @@ use std::collections::HashSet;
 use std::fs::{File, OpenOptions};
 use std::io::Write;
 use std::path::{Path, PathBuf};
+use std::task::Poll;
 
 use anyhow::Context as _;
 use flate2::read::GzDecoder;
@@ -179,6 +180,7 @@ use crate::sources::PathSource;
 use crate::util::hex;
 use crate::util::interning::InternedString;
 use crate::util::into_url::IntoUrl;
+use crate::util::network::PollExt;
 use crate::util::{restricted_names, CargoResult, Config, Filesystem, OptVersionReq};
 
 const PACKAGE_SOURCE_LOCK: &str = ".cargo-ok";
@@ -440,12 +442,14 @@ pub trait RegistryData {
     /// * `root` is the root path to the index.
     /// * `path` is the relative path to the package to load (like `ca/rg/cargo`).
     /// * `data` is a callback that will receive the raw bytes of the index JSON file.
+    ///
+    /// If `load` returns a `Poll::Pending` then it must not have called data.
     fn load(
         &self,
         root: &Path,
         path: &Path,
         data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
-    ) -> CargoResult<()>;
+    ) -> Poll<CargoResult<()>>;
 
     /// Loads the `config.json` file and returns it.
     ///
@@ -508,6 +512,9 @@ pub trait RegistryData {
     ///
     /// This is used by index caching to check if the cache is out of date.
     fn current_version(&self) -> Option<InternedString>;
+
+    /// Block until all outstanding Poll::Pending requests are Poll::Ready.
+    fn block_until_ready(&mut self) -> CargoResult<()>;
 }
 
 /// The status of [`RegistryData::download`] which indicates if a `.crate`
@@ -678,6 +685,7 @@ impl<'cfg> RegistrySource<'cfg> {
         let summary_with_cksum = self
             .index
             .summaries(package.name(), &req, &mut *self.ops)?
+            .expect("a downloaded dep now pending!?")
             .map(|s| s.summary.clone())
             .next()
             .expect("summary not found");
@@ -692,7 +700,7 @@ impl<'cfg> RegistrySource<'cfg> {
 }
 
 impl<'cfg> Source for RegistrySource<'cfg> {
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         // If this is a precise dependency, then it came from a lock file and in
         // theory the registry is known to contain this version. If, however, we
         // come back with no summaries, then our registry may need to be
@@ -700,15 +708,19 @@ impl<'cfg> Source for RegistrySource<'cfg> {
         if dep.source_id().precise().is_some() && !self.updated {
             debug!("attempting query without update");
             let mut called = false;
-            self.index
-                .query_inner(dep, &mut *self.ops, &self.yanked_whitelist, &mut |s| {
-                    if dep.matches(&s) {
-                        called = true;
-                        f(s);
-                    }
-                })?;
+            let pend =
+                self.index
+                    .query_inner(dep, &mut *self.ops, &self.yanked_whitelist, &mut |s| {
+                        if dep.matches(&s) {
+                            called = true;
+                            f(s);
+                        }
+                    })?;
+            if pend.is_pending() {
+                return Poll::Pending;
+            }
             if called {
-                return Ok(());
+                return Poll::Ready(Ok(()));
             } else {
                 debug!("falling back to an update");
                 self.do_update()?;
@@ -723,7 +735,11 @@ impl<'cfg> Source for RegistrySource<'cfg> {
             })
     }
 
-    fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         self.index
             .query_inner(dep, &mut *self.ops, &self.yanked_whitelist, f)
     }
@@ -757,7 +773,10 @@ impl<'cfg> Source for RegistrySource<'cfg> {
     }
 
     fn download(&mut self, package: PackageId) -> CargoResult<MaybePackage> {
-        let hash = self.index.hash(package, &mut *self.ops)?;
+        let hash = self
+            .index
+            .hash(package, &mut *self.ops)?
+            .expect("we got to downloading a dep while pending!?");
         match self.ops.download(package, hash)? {
             MaybeLock::Ready(file) => self.get_pkg(package, &file).map(MaybePackage::Ready),
             MaybeLock::Download { url, descriptor } => {
@@ -767,7 +786,10 @@ impl<'cfg> Source for RegistrySource<'cfg> {
     }
 
     fn finish_download(&mut self, package: PackageId, data: Vec<u8>) -> CargoResult<Package> {
-        let hash = self.index.hash(package, &mut *self.ops)?;
+        let hash = self
+            .index
+            .hash(package, &mut *self.ops)?
+            .expect("we got to downloading a dep while pending!?");
         let file = self.ops.finish_download(package, hash, &data)?;
         self.get_pkg(package, &file)
     }
@@ -788,6 +810,19 @@ impl<'cfg> Source for RegistrySource<'cfg> {
         if !self.updated {
             self.do_update()?;
         }
-        self.index.is_yanked(pkg, &mut *self.ops)
+        loop {
+            match self.index.is_yanked(pkg, &mut *self.ops)? {
+                Poll::Ready(yanked) => {
+                    return Ok(yanked);
+                }
+                Poll::Pending => {
+                    self.block_until_ready()?;
+                }
+            }
+        }
+    }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        self.ops.block_until_ready()
     }
 }
index f3bc0edb53a28f161e2c30022d15dd93161591db..e54416471d546dd95ff8ffc17d4421249b88d7e8 100644 (file)
@@ -7,6 +7,7 @@ use crate::sources::registry::{
 };
 use crate::util::errors::CargoResult;
 use crate::util::interning::InternedString;
+use crate::util::network::PollExt;
 use crate::util::{Config, Filesystem};
 use anyhow::Context as _;
 use cargo_util::{paths, registry::make_dep_path, Sha256};
@@ -20,6 +21,7 @@ use std::io::SeekFrom;
 use std::mem;
 use std::path::Path;
 use std::str;
+use std::task::Poll;
 
 /// A remote registry is a registry that lives at a remote URL (such as
 /// crates.io). The git index is cloned locally, and `.crate` files are
@@ -168,7 +170,7 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> {
         _root: &Path,
         path: &Path,
         data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
-    ) -> CargoResult<()> {
+    ) -> Poll<CargoResult<()>> {
         // Note that the index calls this method and the filesystem is locked
         // in the index, so we don't need to worry about an `update_index`
         // happening in a different process.
@@ -178,9 +180,15 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> {
         let object = entry.to_object(repo)?;
         let blob = match object.as_blob() {
             Some(blob) => blob,
-            None => anyhow::bail!("path `{}` is not a blob in the git repo", path.display()),
+            None => {
+                return Err(anyhow::anyhow!(
+                    "path `{}` is not a blob in the git repo",
+                    path.display()
+                ))
+                .into()
+            }
         };
-        data(blob.content())
+        Poll::Ready(Ok(data(blob.content())?))
     }
 
     fn config(&mut self) -> CargoResult<Option<RegistryConfig>> {
@@ -191,7 +199,8 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> {
         self.load(Path::new(""), Path::new("config.json"), &mut |json| {
             config = Some(serde_json::from_slice(json)?);
             Ok(())
-        })?;
+        })
+        .expect("git registries never return pending")?;
         trace!("config loaded");
         Ok(config)
     }
@@ -329,6 +338,10 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> {
         }
         false
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        Ok(())
+    }
 }
 
 impl<'cfg> Drop for RemoteRegistry<'cfg> {
index 468df095cd6e864a8ed0ac92a0dfdbee3b21d570..52837e27fdcba9d141448336d843e658b7faafdf 100644 (file)
@@ -1,6 +1,7 @@
 use crate::core::source::MaybePackage;
 use crate::core::{Dependency, Package, PackageId, Source, SourceId, Summary};
 use crate::util::errors::CargoResult;
+use std::task::Poll;
 
 use anyhow::Context as _;
 
@@ -41,7 +42,7 @@ impl<'cfg> Source for ReplacedSource<'cfg> {
         self.inner.requires_precise()
     }
 
-    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
         let (replace_with, to_replace) = (self.replace_with, self.to_replace);
         let dep = dep.clone().map_source(to_replace, replace_with);
 
@@ -49,11 +50,19 @@ impl<'cfg> Source for ReplacedSource<'cfg> {
             .query(&dep, &mut |summary| {
                 f(summary.map_source(replace_with, to_replace))
             })
-            .with_context(|| format!("failed to query replaced source {}", self.to_replace))?;
-        Ok(())
+            .map_err(|e| {
+                e.context(format!(
+                    "failed to query replaced source {}",
+                    self.to_replace
+                ))
+            })
     }
 
-    fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> {
+    fn fuzzy_query(
+        &mut self,
+        dep: &Dependency,
+        f: &mut dyn FnMut(Summary),
+    ) -> Poll<CargoResult<()>> {
         let (replace_with, to_replace) = (self.replace_with, self.to_replace);
         let dep = dep.clone().map_source(to_replace, replace_with);
 
@@ -61,8 +70,12 @@ impl<'cfg> Source for ReplacedSource<'cfg> {
             .fuzzy_query(&dep, &mut |summary| {
                 f(summary.map_source(replace_with, to_replace))
             })
-            .with_context(|| format!("failed to query replaced source {}", self.to_replace))?;
-        Ok(())
+            .map_err(|e| {
+                e.context(format!(
+                    "failed to query replaced source {}",
+                    self.to_replace
+                ))
+            })
     }
 
     fn update(&mut self) -> CargoResult<()> {
@@ -127,4 +140,8 @@ impl<'cfg> Source for ReplacedSource<'cfg> {
     fn is_yanked(&mut self, pkg: PackageId) -> CargoResult<bool> {
         self.inner.is_yanked(pkg)
     }
+
+    fn block_until_ready(&mut self) -> CargoResult<()> {
+        self.inner.block_until_ready()
+    }
 }
index 2a590bc1364ac658bdc9bdcb6a440df54dfba67e..07c7ceae1abff1703dfc00497c749e244f4c30d0 100644 (file)
@@ -2,6 +2,21 @@ use anyhow::Error;
 
 use crate::util::errors::{CargoResult, HttpNot200};
 use crate::util::Config;
+use std::task::Poll;
+
+pub trait PollExt<T> {
+    fn expect(self, msg: &str) -> T;
+}
+
+impl<T> PollExt<T> for Poll<T> {
+    #[track_caller]
+    fn expect(self, msg: &str) -> T {
+        match self {
+            Poll::Ready(val) => val,
+            Poll::Pending => panic!("{}", msg),
+        }
+    }
+}
 
 pub struct Retry<'a> {
     config: &'a Config,