8 use anyhow
::{bail, format_err, Error}
;
9 use flate2
::bufread
::GzDecoder
;
11 use proxmox_http
::{client::sync::Client, HttpClient, HttpOptions}
;
12 use proxmox_sys
::fs
::file_get_contents
;
15 config
::{MirrorConfig, SubscriptionKey}
,
18 types
::{Snapshot, SNAPSHOT_REGEX}
,
19 FetchResult
, Progress
,
23 CheckSums
, CompressionType
, FileReference
, FileReferenceType
, PackagesFile
, ReleaseFile
,
25 repositories
::{APTRepository, APTRepositoryPackageType}
,
30 fn mirror_dir(config
: &MirrorConfig
) -> String
{
31 format
!("{}/{}", config
.base_dir
, config
.id
)
34 pub(crate) fn pool(config
: &MirrorConfig
) -> Result
<Pool
, Error
> {
35 let pool_dir
= format
!("{}/.pool", config
.base_dir
);
36 Pool
::open(Path
::new(&mirror_dir(config
)), Path
::new(&pool_dir
))
39 /// `MirrorConfig`, but some fields converted/parsed into usable types.
40 struct ParsedMirrorConfig
{
41 pub repository
: APTRepository
,
42 pub architectures
: Vec
<String
>,
47 pub auth
: Option
<String
>,
49 pub ignore_errors
: bool
,
52 impl TryInto
<ParsedMirrorConfig
> for MirrorConfig
{
53 type Error
= anyhow
::Error
;
55 fn try_into(self) -> Result
<ParsedMirrorConfig
, Self::Error
> {
56 let pool
= pool(&self)?
;
58 let repository
= convert_repo_line(self.repository
.clone())?
;
60 let key
= file_get_contents(Path
::new(&self.key_path
))?
;
62 let options
= HttpOptions
{
63 user_agent
: Some("proxmox-offline-mirror 0.1".to_string()),
65 }; // TODO actually read version ;)
67 let client
= Client
::new(options
);
69 Ok(ParsedMirrorConfig
{
71 architectures
: self.architectures
,
78 ignore_errors
: self.ignore_errors
,
83 // Helper to get absolute URL for dist-specific relative `path`.
84 fn get_dist_url(repo
: &APTRepository
, path
: &str) -> String
{
85 let dist_root
= format
!("{}/dists/{}", repo
.uris
[0], repo
.suites
[0]);
87 format
!("{}/{}", dist_root
, path
)
90 // Helper to get dist-specific path given a `prefix` (snapshot dir) and relative `path`.
91 fn get_dist_path(repo
: &APTRepository
, prefix
: &Path
, path
: &str) -> PathBuf
{
92 let mut base
= PathBuf
::from(prefix
);
94 base
.push(&repo
.suites
[0]);
99 // Helper to get generic URL given a `repo` and `path`.
100 fn get_repo_url(repo
: &APTRepository
, path
: &str) -> String
{
101 format
!("{}/{}", repo
.uris
[0], path
)
104 /// Helper to fetch file from URI and optionally verify the responses checksum.
106 /// Only fetches and returns data, doesn't store anything anywhere.
111 checksums
: Option
<&CheckSums
>,
113 ) -> Result
<FetchResult
, Error
> {
114 println
!("-> GET '{}'..", uri
);
116 let headers
= if let Some(auth
) = auth
{
117 let mut map
= HashMap
::new();
118 map
.insert("Authorization".to_string(), auth
.to_string());
124 let response
= client
.get(uri
, headers
.as_ref())?
;
126 let reader
: Box
<dyn Read
> = response
.into_body();
127 let mut reader
= reader
.take(max_size
as u64);
128 let mut data
= Vec
::new();
129 reader
.read_to_end(&mut data
)?
;
131 if let Some(checksums
) = checksums
{
132 checksums
.verify(&data
)?
;
141 /// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository.
143 /// Verifies the contained/detached signature, stores all fetched files under `prefix`, and returns the verified raw release file data.
145 config
: &ParsedMirrorConfig
,
149 ) -> Result
<FetchResult
, Error
> {
150 let (name
, fetched
, sig
) = if detached
{
151 println
!("Fetching Release/Release.gpg files");
152 let sig
= fetch_repo_file(
154 &get_dist_url(&config
.repository
, "Release.gpg"),
157 config
.auth
.as_deref(),
159 let mut fetched
= fetch_repo_file(
161 &get_dist_url(&config
.repository
, "Release"),
164 config
.auth
.as_deref(),
166 fetched
.fetched
+= sig
.fetched
;
167 ("Release(.gpg)", fetched
, Some(sig
.data()))
169 println
!("Fetching InRelease file");
170 let fetched
= fetch_repo_file(
172 &get_dist_url(&config
.repository
, "InRelease"),
175 config
.auth
.as_deref(),
177 ("InRelease", fetched
, None
)
180 println
!("Verifying '{name}' signature using provided repository key..");
181 let content
= fetched
.data_ref();
182 let verified
= helpers
::verify_signature(content
, &config
.key
, sig
.as_deref())?
;
185 let sha512
= Some(openssl
::sha
::sha512(content
));
186 let csums
= CheckSums
{
192 return Ok(FetchResult
{
194 fetched
: fetched
.fetched
,
198 let locked
= &config
.pool
.lock()?
;
200 if !locked
.contains(&csums
) {
201 locked
.add_file(content
, &csums
, config
.sync
)?
;
207 Path
::new(&get_dist_path(&config
.repository
, prefix
, "Release")),
209 let sig
= sig
.unwrap();
210 let sha512
= Some(openssl
::sha
::sha512(&sig
));
211 let csums
= CheckSums
{
215 if !locked
.contains(&csums
) {
216 locked
.add_file(&sig
, &csums
, config
.sync
)?
;
220 Path
::new(&get_dist_path(&config
.repository
, prefix
, "Release.gpg")),
225 Path
::new(&get_dist_path(&config
.repository
, prefix
, "InRelease")),
231 fetched
: fetched
.fetched
,
235 /// Helper to fetch an index file referenced by a `ReleaseFile`.
237 /// Since these usually come in compressed and uncompressed form, with the latter often not actually existing in the source repository as file, this fetches and if necessary decompresses to obtain a copy of the uncompressed data.
238 /// Will skip fetching if both references are already available with the expected checksum in the pool, in which case they will just be re-linked under the new path.
240 /// Returns the uncompressed data.
242 config
: &ParsedMirrorConfig
,
244 reference
: &FileReference
,
245 uncompressed
: Option
<&FileReference
>,
248 ) -> Result
<FetchResult
, Error
> {
249 let url
= get_dist_url(&config
.repository
, &reference
.path
);
250 let path
= get_dist_path(&config
.repository
, prefix
, &reference
.path
);
252 if let Some(uncompressed
) = uncompressed
{
253 let uncompressed_path
= get_dist_path(&config
.repository
, prefix
, &uncompressed
.path
);
255 if config
.pool
.contains(&reference
.checksums
)
256 && config
.pool
.contains(&uncompressed
.checksums
)
260 .get_contents(&uncompressed
.checksums
, config
.verify
)?
;
263 return Ok(FetchResult { data, fetched: 0 }
);
265 // Ensure they're linked at current path
266 config
.pool
.lock()?
.link_file(&reference
.checksums
, &path
)?
;
270 .link_file(&uncompressed
.checksums
, &uncompressed_path
)?
;
271 return Ok(FetchResult { data, fetched: 0 }
);
275 let urls
= if by_hash
{
276 let mut urls
= Vec
::new();
277 if let Some((base_url
, _file_name
)) = url
.rsplit_once('
/'
) {
278 if let Some(sha512
) = reference
.checksums
.sha512
{
279 urls
.push(format
!("{base_url}/by-hash/SHA512/{}", hex
::encode(sha512
)));
281 if let Some(sha256
) = reference
.checksums
.sha256
{
282 urls
.push(format
!("{base_url}/by-hash/SHA256/{}", hex
::encode(sha256
)));
293 .fold(None
, |res
, url
| match res
{
294 Some(Ok(res
)) => Some(Ok(res
)),
295 _
=> Some(fetch_plain_file(
300 &reference
.checksums
,
305 .ok_or_else(|| format_err
!("Failed to retrieve {}", reference
.path
))??
;
307 let mut buf
= Vec
::new();
308 let raw
= res
.data_ref();
310 let decompressed
= match reference
.file_type
.compression() {
312 Some(CompressionType
::Gzip
) => {
313 let mut gz
= GzDecoder
::new(raw
);
314 gz
.read_to_end(&mut buf
)?
;
317 Some(CompressionType
::Bzip2
) => {
318 let mut bz
= bzip2
::read
::BzDecoder
::new(raw
);
319 bz
.read_to_end(&mut buf
)?
;
322 Some(CompressionType
::Lzma
) | Some(CompressionType
::Xz
) => {
323 let mut xz
= xz2
::read
::XzDecoder
::new_multi_decoder(raw
);
324 xz
.read_to_end(&mut buf
)?
;
328 let res
= FetchResult
{
329 data
: decompressed
.to_owned(),
330 fetched
: res
.fetched
,
337 let locked
= &config
.pool
.lock()?
;
338 if let Some(uncompressed
) = uncompressed
{
339 if !locked
.contains(&uncompressed
.checksums
) {
340 locked
.add_file(decompressed
, &uncompressed
.checksums
, config
.sync
)?
;
343 // Ensure it's linked at current path
344 let uncompressed_path
= get_dist_path(&config
.repository
, prefix
, &uncompressed
.path
);
345 locked
.link_file(&uncompressed
.checksums
, &uncompressed_path
)?
;
351 /// Helper to fetch arbitrary files like binary packages.
353 /// Will skip fetching if matching file already exists locally, in which case it will just be re-linked under the new path.
355 /// If need_data is false and the mirror config is set to skip verification, reading the file's content will be skipped as well if fetching was skipped.
357 config
: &ParsedMirrorConfig
,
361 checksums
: &CheckSums
,
364 ) -> Result
<FetchResult
, Error
> {
365 let locked
= &config
.pool
.lock()?
;
366 let res
= if locked
.contains(checksums
) {
367 if need_data
|| config
.verify
{
369 .get_contents(checksums
, config
.verify
)
370 .map(|data
| FetchResult { data, fetched: 0 }
)?
372 // performance optimization for .deb files if verify is false
373 // we never need the file contents and they make up the bulk of a repo
379 } else if dry_run
&& !need_data
{
385 let fetched
= fetch_repo_file(
390 config
.auth
.as_deref(),
392 locked
.add_file(fetched
.data_ref(), checksums
, config
.verify
)?
;
397 // Ensure it's linked at current path
398 locked
.link_file(checksums
, file
)?
;
404 /// Initialize a new mirror (by creating the corresponding pool).
405 pub fn init(config
: &MirrorConfig
) -> Result
<(), Error
> {
406 let pool_dir
= format
!("{}/.pool", config
.base_dir
);
408 let dir
= format
!("{}/{}", config
.base_dir
, config
.id
);
410 Pool
::create(Path
::new(&dir
), Path
::new(&pool_dir
))?
;
414 /// Destroy a mirror (by destroying the corresponding pool's link dir followed by GC).
415 pub fn destroy(config
: &MirrorConfig
) -> Result
<(), Error
> {
416 let pool
: Pool
= pool(config
)?
;
417 pool
.lock()?
.destroy()?
;
423 pub fn list_snapshots(config
: &MirrorConfig
) -> Result
<Vec
<Snapshot
>, Error
> {
424 let _pool
: Pool
= pool(config
)?
;
426 let mut list
: Vec
<Snapshot
> = vec
![];
428 let dir
= mirror_dir(config
);
430 let path
= Path
::new(&dir
);
432 proxmox_sys
::fs
::scandir(
436 |_l2_fd
, snapshot
, file_type
| {
437 if file_type
!= nix
::dir
::Type
::Directory
{
441 list
.push(snapshot
.parse()?
);
447 list
.sort_unstable();
452 /// Create a new snapshot of the remote repository, fetching and storing files as needed.
454 /// Operates in three phases:
455 /// - Fetch and verify release files
456 /// - Fetch referenced indices according to config
457 /// - Fetch binary packages referenced by package indices
459 /// Files will be linked in a temporary directory and only renamed to the final, valid snapshot directory at the end. In case of error, leftover `XXX.tmp` directories at the top level of `base_dir` can be safely removed once the next snapshot was successfully created, as they only contain hardlinks.
460 pub fn create_snapshot(
461 config
: MirrorConfig
,
463 subscription
: Option
<SubscriptionKey
>,
465 ) -> Result
<(), Error
> {
466 let auth
= if let Some(product
) = &config
.use_subscription
{
470 "Mirror {} requires a subscription key, but none given.",
474 Some(key
) if key
.product() == *product
=> {
475 let base64
= base64
::encode(format
!("{}:{}", key
.key
, key
.server_id
));
476 Some(format
!("basic {base64}"))
480 "Repository product type '{}' and key product type '{}' don't match.",
490 let mut config
: ParsedMirrorConfig
= config
.try_into()?
;
493 let prefix
= format
!("{snapshot}.tmp");
494 let prefix
= Path
::new(&prefix
);
496 let mut total_progress
= Progress
::new();
498 let parse_release
= |res
: FetchResult
, name
: &str| -> Result
<ReleaseFile
, Error
> {
499 println
!("Parsing {name}..");
500 let parsed
: ReleaseFile
= res
.data
[..].try_into()?
;
502 "'{name}' file has {} referenced files..",
508 // we want both on-disk for compat reasons
509 let res
= fetch_release(&config
, prefix
, true, dry_run
)?
;
510 total_progress
.update(&res
);
511 let _release
= parse_release(res
, "Release")?
;
513 let res
= fetch_release(&config
, prefix
, false, dry_run
)?
;
514 total_progress
.update(&res
);
515 let release
= parse_release(res
, "InRelease")?
;
517 let mut per_component
= HashMap
::new();
518 let mut others
= Vec
::new();
522 .contains(&APTRepositoryPackageType
::Deb
);
526 .contains(&APTRepositoryPackageType
::DebSrc
);
528 for (basename
, references
) in &release
.files
{
529 let reference
= references
.first();
530 let reference
= if let Some(reference
) = reference
{
535 let skip_components
= !&config
.repository
.components
.contains(&reference
.component
);
537 let skip
= skip_components
538 || match &reference
.file_type
{
539 FileReferenceType
::Ignored
=> true,
540 FileReferenceType
::PDiff
=> true, // would require fetching the patches as well
541 FileReferenceType
::Sources(_
) => !source
,
543 if let Some(arch
) = reference
.file_type
.architecture() {
544 !binary
|| !config
.architectures
.contains(arch
)
551 println
!("Skipping {}", reference
.path
);
552 others
.push(reference
);
554 let list
= per_component
555 .entry(reference
.component
)
556 .or_insert_with(Vec
::new
);
562 let mut indices_size
= 0_usize
;
563 let mut total_count
= 0;
565 for (component
, references
) in &per_component
{
566 println
!("Component '{component}'");
568 let mut component_indices_size
= 0;
570 for basename
in references
{
571 for reference
in release
.files
.get(*basename
).unwrap() {
572 println
!("\t{:?}: {:?}", reference
.path
, reference
.file_type
);
573 component_indices_size
+= reference
.size
;
576 indices_size
+= component_indices_size
;
578 let component_count
= references
.len();
579 total_count
+= component_count
;
581 println
!("Component references count: {component_count}");
582 println
!("Component indices size: {component_indices_size}");
583 if references
.is_empty() {
584 println
!("\tNo references found..");
587 println
!("Total indices count: {total_count}");
588 println
!("Total indices size: {indices_size}");
590 if !others
.is_empty() {
591 println
!("Skipped {} references", others
.len());
595 let mut packages_size
= 0_usize
;
596 let mut packages_indices
= HashMap
::new();
597 let mut failed_references
= Vec
::new();
598 for (component
, references
) in per_component
{
599 println
!("\nFetching indices for component '{component}'");
600 let mut component_deb_size
= 0;
601 let mut fetch_progress
= Progress
::new();
603 for basename
in references
{
604 println
!("\tFetching '{basename}'..");
605 let files
= release
.files
.get(basename
).unwrap();
606 let uncompressed_ref
= files
.iter().find(|reference
| reference
.path
== *basename
);
608 let mut package_index_data
= None
;
610 for reference
in files
{
611 // if both compressed and uncompressed are referenced, the uncompressed file may not exist on the server
612 if Some(reference
) == uncompressed_ref
&& files
.len() > 1 {
616 // this will ensure the uncompressed file will be written locally
617 let res
= match fetch_index_file(
622 release
.aquire_by_hash
,
626 Err(err
) if !reference
.file_type
.is_package_index() => {
628 "Failed to fetch '{:?}' type reference '{}', skipping - {err}",
629 reference
.file_type
, reference
.path
631 failed_references
.push(reference
);
634 Err(err
) => bail
!(err
),
636 fetch_progress
.update(&res
);
638 if package_index_data
.is_none() && reference
.file_type
.is_package_index() {
639 package_index_data
= Some(res
.data());
642 if let Some(data
) = package_index_data
{
643 let packages
: PackagesFile
= data
[..].try_into()?
;
644 let size
: usize = packages
.files
.iter().map(|p
| p
.size
).sum();
645 println
!("\t{} packages totalling {size}", packages
.files
.len());
646 component_deb_size
+= size
;
648 packages_indices
.entry(basename
).or_insert(packages
);
650 println
!("Progress: {fetch_progress}");
652 println
!("Total deb size for component: {component_deb_size}");
653 packages_size
+= component_deb_size
;
654 total_progress
+= fetch_progress
;
656 println
!("Total deb size: {packages_size}");
657 if !failed_references
.is_empty() {
658 eprintln
!("Failed to download non-package-index references:");
659 for reference
in failed_references
{
660 eprintln
!("\t{}", reference
.path
);
664 println
!("\nFetching packages..");
665 let mut dry_run_progress
= Progress
::new();
666 for (basename
, references
) in packages_indices
{
667 let total_files
= references
.files
.len();
668 if total_files
== 0 {
669 println
!("\n{basename} - no files, skipping.");
672 println
!("\n{basename} - {total_files} total file(s)");
675 let mut fetch_progress
= Progress
::new();
676 for package
in references
.files
{
677 let url
= get_repo_url(&config
.repository
, &package
.file
);
680 if config
.pool
.contains(&package
.checksums
) {
681 fetch_progress
.update(&FetchResult
{
686 println
!("\t(dry-run) GET missing '{url}' ({}b)", package
.size
);
687 fetch_progress
.update(&FetchResult
{
689 fetched
: package
.size
,
693 let mut full_path
= PathBuf
::from(prefix
);
694 full_path
.push(&package
.file
);
696 match fetch_plain_file(
705 Ok(res
) => fetch_progress
.update(&res
),
706 Err(err
) if config
.ignore_errors
=> {
708 "{}: failed to fetch package '{}' - {}",
709 basename
, package
.file
, err
,
719 if fetch_progress
.file_count() % (max(total_files
/ 100, 1)) == 0 {
720 println
!("\tProgress: {fetch_progress}");
723 println
!("\tProgress: {fetch_progress}");
725 dry_run_progress
+= fetch_progress
;
727 total_progress
+= fetch_progress
;
732 println
!("\nDry-run Stats (indices, downloaded but not persisted):\n{total_progress}");
733 println
!("\nDry-run stats (packages, new == missing):\n{dry_run_progress}");
735 println
!("\nStats: {total_progress}");
739 println
!("Rotating temp. snapshot in-place: {prefix:?} -> \"{snapshot}\"");
740 let locked
= config
.pool
.lock()?
;
741 locked
.rename(prefix
, Path
::new(&format
!("{snapshot}")))?
;
747 /// Remove a snapshot by removing the corresponding snapshot directory. To actually free up space, a garbage collection needs to be run afterwards.
748 pub fn remove_snapshot(config
: &MirrorConfig
, snapshot
: &Snapshot
) -> Result
<(), Error
> {
749 let pool
: Pool
= pool(config
)?
;
750 let path
= pool
.get_path(Path
::new(&snapshot
.to_string()))?
;
752 pool
.lock()?
.remove_dir(&path
)
755 /// Run a garbage collection on the underlying pool.
756 pub fn gc(config
: &MirrorConfig
) -> Result
<(usize, u64), Error
> {
757 let pool
: Pool
= pool(config
)?
;