]> git.proxmox.com Git - proxmox-offline-mirror.git/blob - src/mirror.rs
51263936b2fc5191e36804c9ff4907645481720b
[proxmox-offline-mirror.git] / src / mirror.rs
1 use std::{
2 cmp::max,
3 collections::HashMap,
4 io::Read,
5 path::{Path, PathBuf},
6 };
7
8 use anyhow::{bail, format_err, Error};
9 use flate2::bufread::GzDecoder;
10 use nix::libc;
11 use proxmox_http::{client::sync::Client, HttpClient, HttpOptions};
12 use proxmox_sys::fs::file_get_contents;
13
14 use crate::{
15 config::{MirrorConfig, SubscriptionKey},
16 convert_repo_line,
17 pool::Pool,
18 types::{Snapshot, SNAPSHOT_REGEX},
19 FetchResult, Progress,
20 };
21 use proxmox_apt::{
22 deb822::{
23 CheckSums, CompressionType, FileReference, FileReferenceType, PackagesFile, ReleaseFile,
24 },
25 repositories::{APTRepository, APTRepositoryPackageType},
26 };
27
28 use crate::helpers;
29
30 fn mirror_dir(config: &MirrorConfig) -> String {
31 format!("{}/{}", config.base_dir, config.id)
32 }
33
34 pub(crate) fn pool(config: &MirrorConfig) -> Result<Pool, Error> {
35 let pool_dir = format!("{}/.pool", config.base_dir);
36 Pool::open(Path::new(&mirror_dir(config)), Path::new(&pool_dir))
37 }
38
39 /// `MirrorConfig`, but some fields converted/parsed into usable types.
40 struct ParsedMirrorConfig {
41 pub repository: APTRepository,
42 pub architectures: Vec<String>,
43 pub pool: Pool,
44 pub key: Vec<u8>,
45 pub verify: bool,
46 pub sync: bool,
47 pub auth: Option<String>,
48 pub client: Client,
49 }
50
51 impl TryInto<ParsedMirrorConfig> for MirrorConfig {
52 type Error = anyhow::Error;
53
54 fn try_into(self) -> Result<ParsedMirrorConfig, Self::Error> {
55 let pool = pool(&self)?;
56
57 let repository = convert_repo_line(self.repository.clone())?;
58
59 let key = file_get_contents(Path::new(&self.key_path))?;
60
61 let options = HttpOptions {
62 user_agent: Some("proxmox-offline-mirror 0.1".to_string()),
63 ..Default::default()
64 }; // TODO actually read version ;)
65
66 let client = Client::new(options);
67
68 Ok(ParsedMirrorConfig {
69 repository,
70 architectures: self.architectures,
71 pool,
72 key,
73 verify: self.verify,
74 sync: self.sync,
75 auth: None,
76 client,
77 })
78 }
79 }
80
81 // Helper to get absolute URL for dist-specific relative `path`.
82 fn get_dist_url(repo: &APTRepository, path: &str) -> String {
83 let dist_root = format!("{}/dists/{}", repo.uris[0], repo.suites[0]);
84
85 format!("{}/{}", dist_root, path)
86 }
87
88 // Helper to get dist-specific path given a `prefix` (snapshot dir) and relative `path`.
89 fn get_dist_path(repo: &APTRepository, prefix: &Path, path: &str) -> PathBuf {
90 let mut base = PathBuf::from(prefix);
91 base.push("dists");
92 base.push(&repo.suites[0]);
93 base.push(path);
94 base
95 }
96
97 // Helper to get generic URL given a `repo` and `path`.
98 fn get_repo_url(repo: &APTRepository, path: &str) -> String {
99 format!("{}/{}", repo.uris[0], path)
100 }
101
102 /// Helper to fetch file from URI and optionally verify the responses checksum.
103 ///
104 /// Only fetches and returns data, doesn't store anything anywhere.
105 fn fetch_repo_file(
106 client: &Client,
107 uri: &str,
108 max_size: usize,
109 checksums: Option<&CheckSums>,
110 auth: Option<&str>,
111 ) -> Result<FetchResult, Error> {
112 println!("-> GET '{}'..", uri);
113
114 let headers = if let Some(auth) = auth {
115 let mut map = HashMap::new();
116 map.insert("Authorization".to_string(), auth.to_string());
117 Some(map)
118 } else {
119 None
120 };
121
122 let response = client.get(uri, headers.as_ref())?;
123
124 let reader: Box<dyn Read> = response.into_body();
125 let mut reader = reader.take(max_size as u64);
126 let mut data = Vec::new();
127 reader.read_to_end(&mut data)?;
128
129 if let Some(checksums) = checksums {
130 checksums.verify(&data)?;
131 }
132
133 Ok(FetchResult {
134 fetched: data.len(),
135 data,
136 })
137 }
138
139 /// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository.
140 ///
141 /// Verifies the contained/detached signature, stores all fetched files under `prefix`, and returns the verified raw release file data.
142 fn fetch_release(
143 config: &ParsedMirrorConfig,
144 prefix: &Path,
145 detached: bool,
146 dry_run: bool,
147 ) -> Result<FetchResult, Error> {
148 let (name, fetched, sig) = if detached {
149 println!("Fetching Release/Release.gpg files");
150 let sig = fetch_repo_file(
151 &config.client,
152 &get_dist_url(&config.repository, "Release.gpg"),
153 1024 * 1024,
154 None,
155 config.auth.as_deref(),
156 )?;
157 let mut fetched = fetch_repo_file(
158 &config.client,
159 &get_dist_url(&config.repository, "Release"),
160 256 * 1024 * 1024,
161 None,
162 config.auth.as_deref(),
163 )?;
164 fetched.fetched += sig.fetched;
165 ("Release(.gpg)", fetched, Some(sig.data()))
166 } else {
167 println!("Fetching InRelease file");
168 let fetched = fetch_repo_file(
169 &config.client,
170 &get_dist_url(&config.repository, "InRelease"),
171 256 * 1024 * 1024,
172 None,
173 config.auth.as_deref(),
174 )?;
175 ("InRelease", fetched, None)
176 };
177
178 println!("Verifying '{name}' signature using provided repository key..");
179 let content = fetched.data_ref();
180 let verified = helpers::verify_signature(content, &config.key, sig.as_deref())?;
181 println!("Success");
182
183 let sha512 = Some(openssl::sha::sha512(content));
184 let csums = CheckSums {
185 sha512,
186 ..Default::default()
187 };
188
189 if dry_run {
190 return Ok(FetchResult {
191 data: verified,
192 fetched: fetched.fetched,
193 });
194 }
195
196 let locked = &config.pool.lock()?;
197
198 if !locked.contains(&csums) {
199 locked.add_file(content, &csums, config.sync)?;
200 }
201
202 if detached {
203 locked.link_file(
204 &csums,
205 Path::new(&get_dist_path(&config.repository, prefix, "Release")),
206 )?;
207 let sig = sig.unwrap();
208 let sha512 = Some(openssl::sha::sha512(&sig));
209 let csums = CheckSums {
210 sha512,
211 ..Default::default()
212 };
213 if !locked.contains(&csums) {
214 locked.add_file(&sig, &csums, config.sync)?;
215 }
216 locked.link_file(
217 &csums,
218 Path::new(&get_dist_path(&config.repository, prefix, "Release.gpg")),
219 )?;
220 } else {
221 locked.link_file(
222 &csums,
223 Path::new(&get_dist_path(&config.repository, prefix, "InRelease")),
224 )?;
225 }
226
227 Ok(FetchResult {
228 data: verified,
229 fetched: fetched.fetched,
230 })
231 }
232
233 /// Helper to fetch an index file referenced by a `ReleaseFile`.
234 ///
235 /// Since these usually come in compressed and uncompressed form, with the latter often not actually existing in the source repository as file, this fetches and if necessary decompresses to obtain a copy of the uncompressed data.
236 /// Will skip fetching if both references are already available with the expected checksum in the pool, in which case they will just be re-linked under the new path.
237 ///
238 /// Returns the uncompressed data.
239 fn fetch_index_file(
240 config: &ParsedMirrorConfig,
241 prefix: &Path,
242 reference: &FileReference,
243 uncompressed: Option<&FileReference>,
244 by_hash: bool,
245 dry_run: bool,
246 ) -> Result<FetchResult, Error> {
247 let url = get_dist_url(&config.repository, &reference.path);
248 let path = get_dist_path(&config.repository, prefix, &reference.path);
249
250 if let Some(uncompressed) = uncompressed {
251 let uncompressed_path = get_dist_path(&config.repository, prefix, &uncompressed.path);
252
253 if config.pool.contains(&reference.checksums)
254 && config.pool.contains(&uncompressed.checksums)
255 {
256 let data = config
257 .pool
258 .get_contents(&uncompressed.checksums, config.verify)?;
259
260 if dry_run {
261 return Ok(FetchResult { data, fetched: 0 });
262 }
263 // Ensure they're linked at current path
264 config.pool.lock()?.link_file(&reference.checksums, &path)?;
265 config
266 .pool
267 .lock()?
268 .link_file(&uncompressed.checksums, &uncompressed_path)?;
269 return Ok(FetchResult { data, fetched: 0 });
270 }
271 }
272
273 let urls = if by_hash {
274 let mut urls = Vec::new();
275 if let Some((base_url, _file_name)) = url.rsplit_once('/') {
276 if let Some(sha512) = reference.checksums.sha512 {
277 urls.push(format!("{base_url}/by-hash/SHA512/{}", hex::encode(sha512)));
278 }
279 if let Some(sha256) = reference.checksums.sha256 {
280 urls.push(format!("{base_url}/by-hash/SHA256/{}", hex::encode(sha256)));
281 }
282 }
283 urls.push(url);
284 urls
285 } else {
286 vec![url]
287 };
288
289 let res = urls
290 .iter()
291 .fold(None, |res, url| match res {
292 Some(Ok(res)) => Some(Ok(res)),
293 _ => Some(fetch_plain_file(
294 config,
295 url,
296 &path,
297 reference.size,
298 &reference.checksums,
299 true,
300 dry_run,
301 )),
302 })
303 .ok_or_else(|| format_err!("Failed to retrieve {}", reference.path))??;
304
305 let mut buf = Vec::new();
306 let raw = res.data_ref();
307
308 let decompressed = match reference.file_type.compression() {
309 None => raw,
310 Some(CompressionType::Gzip) => {
311 let mut gz = GzDecoder::new(raw);
312 gz.read_to_end(&mut buf)?;
313 &buf[..]
314 }
315 Some(CompressionType::Bzip2) => {
316 let mut bz = bzip2::read::BzDecoder::new(raw);
317 bz.read_to_end(&mut buf)?;
318 &buf[..]
319 }
320 Some(CompressionType::Lzma) | Some(CompressionType::Xz) => {
321 let mut xz = xz2::read::XzDecoder::new_multi_decoder(raw);
322 xz.read_to_end(&mut buf)?;
323 &buf[..]
324 }
325 };
326 let res = FetchResult {
327 data: decompressed.to_owned(),
328 fetched: res.fetched,
329 };
330
331 if dry_run {
332 return Ok(res);
333 }
334
335 let locked = &config.pool.lock()?;
336 if let Some(uncompressed) = uncompressed {
337 if !locked.contains(&uncompressed.checksums) {
338 locked.add_file(decompressed, &uncompressed.checksums, config.sync)?;
339 }
340
341 // Ensure it's linked at current path
342 let uncompressed_path = get_dist_path(&config.repository, prefix, &uncompressed.path);
343 locked.link_file(&uncompressed.checksums, &uncompressed_path)?;
344 }
345
346 Ok(res)
347 }
348
349 /// Helper to fetch arbitrary files like binary packages.
350 ///
351 /// Will skip fetching if matching file already exists locally, in which case it will just be re-linked under the new path.
352 ///
353 /// If need_data is false and the mirror config is set to skip verification, reading the file's content will be skipped as well if fetching was skipped.
354 fn fetch_plain_file(
355 config: &ParsedMirrorConfig,
356 url: &str,
357 file: &Path,
358 max_size: usize,
359 checksums: &CheckSums,
360 need_data: bool,
361 dry_run: bool,
362 ) -> Result<FetchResult, Error> {
363 let locked = &config.pool.lock()?;
364 let res = if locked.contains(checksums) {
365 if need_data || config.verify {
366 locked
367 .get_contents(checksums, config.verify)
368 .map(|data| FetchResult { data, fetched: 0 })?
369 } else {
370 // performance optimization for .deb files if verify is false
371 // we never need the file contents and they make up the bulk of a repo
372 FetchResult {
373 data: vec![],
374 fetched: 0,
375 }
376 }
377 } else if dry_run && !need_data {
378 FetchResult {
379 data: vec![],
380 fetched: 0,
381 }
382 } else {
383 let fetched = fetch_repo_file(
384 &config.client,
385 url,
386 max_size,
387 Some(checksums),
388 config.auth.as_deref(),
389 )?;
390 locked.add_file(fetched.data_ref(), checksums, config.verify)?;
391 fetched
392 };
393
394 if !dry_run {
395 // Ensure it's linked at current path
396 locked.link_file(checksums, file)?;
397 }
398
399 Ok(res)
400 }
401
402 /// Initialize a new mirror (by creating the corresponding pool).
403 pub fn init(config: &MirrorConfig) -> Result<(), Error> {
404 let pool_dir = format!("{}/.pool", config.base_dir);
405
406 let dir = format!("{}/{}", config.base_dir, config.id);
407
408 Pool::create(Path::new(&dir), Path::new(&pool_dir))?;
409 Ok(())
410 }
411
412 /// Destroy a mirror (by destroying the corresponding pool's link dir followed by GC).
413 pub fn destroy(config: &MirrorConfig) -> Result<(), Error> {
414 let pool: Pool = pool(config)?;
415 pool.lock()?.destroy()?;
416
417 Ok(())
418 }
419
420 /// List snapshots
421 pub fn list_snapshots(config: &MirrorConfig) -> Result<Vec<Snapshot>, Error> {
422 let _pool: Pool = pool(config)?;
423
424 let mut list: Vec<Snapshot> = vec![];
425
426 let dir = mirror_dir(config);
427
428 let path = Path::new(&dir);
429
430 proxmox_sys::fs::scandir(
431 libc::AT_FDCWD,
432 path,
433 &SNAPSHOT_REGEX,
434 |_l2_fd, snapshot, file_type| {
435 if file_type != nix::dir::Type::Directory {
436 return Ok(());
437 }
438
439 list.push(snapshot.parse()?);
440
441 Ok(())
442 },
443 )?;
444
445 list.sort_unstable();
446
447 Ok(list)
448 }
449
450 /// Create a new snapshot of the remote repository, fetching and storing files as needed.
451 ///
452 /// Operates in three phases:
453 /// - Fetch and verify release files
454 /// - Fetch referenced indices according to config
455 /// - Fetch binary packages referenced by package indices
456 ///
457 /// Files will be linked in a temporary directory and only renamed to the final, valid snapshot directory at the end. In case of error, leftover `XXX.tmp` directories at the top level of `base_dir` can be safely removed once the next snapshot was successfully created, as they only contain hardlinks.
458 pub fn create_snapshot(
459 config: MirrorConfig,
460 snapshot: &Snapshot,
461 subscription: Option<SubscriptionKey>,
462 dry_run: bool,
463 ) -> Result<(), Error> {
464 let auth = if let Some(product) = &config.use_subscription {
465 match subscription {
466 None => {
467 bail!(
468 "Mirror {} requires a subscription key, but none given.",
469 config.id
470 );
471 }
472 Some(key) if key.product() == *product => {
473 let base64 = base64::encode(format!("{}:{}", key.key, key.server_id));
474 Some(format!("basic {base64}"))
475 }
476 Some(key) => {
477 bail!(
478 "Repository product type '{}' and key product type '{}' don't match.",
479 product,
480 key.product()
481 );
482 }
483 }
484 } else {
485 None
486 };
487
488 let mut config: ParsedMirrorConfig = config.try_into()?;
489 config.auth = auth;
490
491 let prefix = format!("{snapshot}.tmp");
492 let prefix = Path::new(&prefix);
493
494 let mut total_progress = Progress::new();
495
496 let parse_release = |res: FetchResult, name: &str| -> Result<ReleaseFile, Error> {
497 println!("Parsing {name}..");
498 let parsed: ReleaseFile = res.data[..].try_into()?;
499 println!(
500 "'{name}' file has {} referenced files..",
501 parsed.files.len()
502 );
503 Ok(parsed)
504 };
505
506 // we want both on-disk for compat reasons
507 let res = fetch_release(&config, prefix, true, dry_run)?;
508 total_progress.update(&res);
509 let _release = parse_release(res, "Release")?;
510
511 let res = fetch_release(&config, prefix, false, dry_run)?;
512 total_progress.update(&res);
513 let release = parse_release(res, "InRelease")?;
514
515 let mut per_component = HashMap::new();
516 let mut others = Vec::new();
517 let binary = &config
518 .repository
519 .types
520 .contains(&APTRepositoryPackageType::Deb);
521 let source = &config
522 .repository
523 .types
524 .contains(&APTRepositoryPackageType::DebSrc);
525
526 for (basename, references) in &release.files {
527 let reference = references.first();
528 let reference = if let Some(reference) = reference {
529 reference.clone()
530 } else {
531 continue;
532 };
533 let skip_components = !&config.repository.components.contains(&reference.component);
534
535 let skip = skip_components
536 || match &reference.file_type {
537 FileReferenceType::Ignored => true,
538 FileReferenceType::PDiff => true, // would require fetching the patches as well
539 FileReferenceType::Sources(_) => !source,
540 _ => {
541 if let Some(arch) = reference.file_type.architecture() {
542 !binary || !config.architectures.contains(arch)
543 } else {
544 false
545 }
546 }
547 };
548 if skip {
549 println!("Skipping {}", reference.path);
550 others.push(reference);
551 } else {
552 let list = per_component
553 .entry(reference.component)
554 .or_insert_with(Vec::new);
555 list.push(basename);
556 }
557 }
558 println!();
559
560 let mut indices_size = 0_usize;
561 let mut total_count = 0;
562
563 for (component, references) in &per_component {
564 println!("Component '{component}'");
565
566 let mut component_indices_size = 0;
567
568 for basename in references {
569 for reference in release.files.get(*basename).unwrap() {
570 println!("\t{:?}: {:?}", reference.path, reference.file_type);
571 component_indices_size += reference.size;
572 }
573 }
574 indices_size += component_indices_size;
575
576 let component_count = references.len();
577 total_count += component_count;
578
579 println!("Component references count: {component_count}");
580 println!("Component indices size: {component_indices_size}");
581 if references.is_empty() {
582 println!("\tNo references found..");
583 }
584 }
585 println!("Total indices count: {total_count}");
586 println!("Total indices size: {indices_size}");
587
588 if !others.is_empty() {
589 println!("Skipped {} references", others.len());
590 }
591 println!();
592
593 let mut packages_size = 0_usize;
594 let mut packages_indices = HashMap::new();
595 let mut failed_references = Vec::new();
596 for (component, references) in per_component {
597 println!("\nFetching indices for component '{component}'");
598 let mut component_deb_size = 0;
599 let mut fetch_progress = Progress::new();
600
601 for basename in references {
602 println!("\tFetching '{basename}'..");
603 let files = release.files.get(basename).unwrap();
604 let uncompressed_ref = files.iter().find(|reference| reference.path == *basename);
605
606 let mut package_index_data = None;
607
608 for reference in files {
609 // if both compressed and uncompressed are referenced, the uncompressed file may not exist on the server
610 if Some(reference) == uncompressed_ref && files.len() > 1 {
611 continue;
612 }
613
614 // this will ensure the uncompressed file will be written locally
615 let res = match fetch_index_file(
616 &config,
617 prefix,
618 reference,
619 uncompressed_ref,
620 release.aquire_by_hash,
621 dry_run,
622 ) {
623 Ok(res) => res,
624 Err(err) if !reference.file_type.is_package_index() => {
625 eprintln!(
626 "Failed to fetch '{:?}' type reference '{}', skipping - {err}",
627 reference.file_type, reference.path
628 );
629 failed_references.push(reference);
630 continue;
631 }
632 Err(err) => bail!(err),
633 };
634 fetch_progress.update(&res);
635
636 if package_index_data.is_none() && reference.file_type.is_package_index() {
637 package_index_data = Some(res.data());
638 }
639 }
640 if let Some(data) = package_index_data {
641 let packages: PackagesFile = data[..].try_into()?;
642 let size: usize = packages.files.iter().map(|p| p.size).sum();
643 println!("\t{} packages totalling {size}", packages.files.len());
644 component_deb_size += size;
645
646 packages_indices.entry(basename).or_insert(packages);
647 }
648 println!("Progress: {fetch_progress}");
649 }
650 println!("Total deb size for component: {component_deb_size}");
651 packages_size += component_deb_size;
652 total_progress += fetch_progress;
653 }
654 println!("Total deb size: {packages_size}");
655 if !failed_references.is_empty() {
656 eprintln!("Failed to download non-package-index references:");
657 for reference in failed_references {
658 eprintln!("\t{}", reference.path);
659 }
660 }
661
662 println!("\nFetching packages..");
663 let mut dry_run_progress = Progress::new();
664 for (basename, references) in packages_indices {
665 let total_files = references.files.len();
666 if total_files == 0 {
667 println!("\n{basename} - no files, skipping.");
668 continue;
669 } else {
670 println!("\n{basename} - {total_files} total file(s)");
671 }
672
673 let mut fetch_progress = Progress::new();
674 for package in references.files {
675 let url = get_repo_url(&config.repository, &package.file);
676
677 if dry_run {
678 if config.pool.contains(&package.checksums) {
679 fetch_progress.update(&FetchResult {
680 data: vec![],
681 fetched: 0,
682 });
683 } else {
684 println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
685 fetch_progress.update(&FetchResult {
686 data: vec![],
687 fetched: package.size,
688 });
689 }
690 } else {
691 let mut full_path = PathBuf::from(prefix);
692 full_path.push(&package.file);
693
694 let res = fetch_plain_file(
695 &config,
696 &url,
697 &full_path,
698 package.size,
699 &package.checksums,
700 false,
701 dry_run,
702 )?;
703 fetch_progress.update(&res);
704 }
705
706 if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
707 println!("\tProgress: {fetch_progress}");
708 }
709 }
710 println!("\tProgress: {fetch_progress}");
711 if dry_run {
712 dry_run_progress += fetch_progress;
713 } else {
714 total_progress += fetch_progress;
715 }
716 }
717
718 if dry_run {
719 println!("\nDry-run Stats (indices, downloaded but not persisted):\n{total_progress}");
720 println!("\nDry-run stats (packages, new == missing):\n{dry_run_progress}");
721 } else {
722 println!("\nStats: {total_progress}");
723 }
724
725 if !dry_run {
726 println!("Rotating temp. snapshot in-place: {prefix:?} -> \"{snapshot}\"");
727 let locked = config.pool.lock()?;
728 locked.rename(prefix, Path::new(&format!("{snapshot}")))?;
729 }
730
731 Ok(())
732 }
733
734 /// Remove a snapshot by removing the corresponding snapshot directory. To actually free up space, a garbage collection needs to be run afterwards.
735 pub fn remove_snapshot(config: &MirrorConfig, snapshot: &Snapshot) -> Result<(), Error> {
736 let pool: Pool = pool(config)?;
737 let path = pool.get_path(Path::new(&snapshot.to_string()))?;
738
739 pool.lock()?.remove_dir(&path)
740 }
741
742 /// Run a garbage collection on the underlying pool.
743 pub fn gc(config: &MirrorConfig) -> Result<(usize, u64), Error> {
744 let pool: Pool = pool(config)?;
745
746 pool.lock()?.gc()
747 }