]> git.proxmox.com Git - proxmox-offline-mirror.git/blame - src/mirror.rs
mirror: handle indices which are only available compressed
[proxmox-offline-mirror.git] / src / mirror.rs
CommitLineData
9ecde319
FG
1use std::{
2 cmp::max,
3 collections::HashMap,
4 io::Read,
5 path::{Path, PathBuf},
6};
7
8b267808 8use anyhow::{bail, format_err, Error};
9ecde319 9use flate2::bufread::GzDecoder;
d035ecb5 10use nix::libc;
49997188 11use proxmox_http::{client::sync::Client, HttpClient, HttpOptions};
d035ecb5
FG
12use proxmox_sys::fs::file_get_contents;
13
14use crate::{
8b267808 15 config::{MirrorConfig, SubscriptionKey},
d035ecb5
FG
16 convert_repo_line,
17 pool::Pool,
18 types::{Snapshot, SNAPSHOT_REGEX},
19 FetchResult, Progress,
20};
9ecde319
FG
21use proxmox_apt::{
22 deb822::{
23 CheckSums, CompressionType, FileReference, FileReferenceType, PackagesFile, ReleaseFile,
24 },
25 repositories::{APTRepository, APTRepositoryPackageType},
26};
27
28use crate::helpers;
29
c598cb15
FG
30fn mirror_dir(config: &MirrorConfig) -> String {
31 format!("{}/{}", config.base_dir, config.id)
32}
33
d035ecb5 34pub(crate) fn pool(config: &MirrorConfig) -> Result<Pool, Error> {
c598cb15
FG
35 let pool_dir = format!("{}/.pool", config.base_dir);
36 Pool::open(Path::new(&mirror_dir(config)), Path::new(&pool_dir))
d035ecb5
FG
37}
38
2d13dcfc 39/// `MirrorConfig`, but some fields converted/parsed into usable types.
d035ecb5
FG
40struct ParsedMirrorConfig {
41 pub repository: APTRepository,
42 pub architectures: Vec<String>,
43 pub pool: Pool,
44 pub key: Vec<u8>,
45 pub verify: bool,
46 pub sync: bool,
8b267808 47 pub auth: Option<String>,
49997188 48 pub client: Client,
d035ecb5
FG
49}
50
51impl TryInto<ParsedMirrorConfig> for MirrorConfig {
52 type Error = anyhow::Error;
53
54 fn try_into(self) -> Result<ParsedMirrorConfig, Self::Error> {
55 let pool = pool(&self)?;
56
57 let repository = convert_repo_line(self.repository.clone())?;
58
59 let key = file_get_contents(Path::new(&self.key_path))?;
60
49997188
FG
61 let options = HttpOptions {
62 user_agent: Some("proxmox-offline-mirror 0.1".to_string()),
63 ..Default::default()
64 }; // TODO actually read version ;)
65
66 let client = Client::new(options);
8b267808 67
d035ecb5
FG
68 Ok(ParsedMirrorConfig {
69 repository,
70 architectures: self.architectures,
71 pool,
72 key,
73 verify: self.verify,
74 sync: self.sync,
8b267808 75 auth: None,
49997188 76 client,
d035ecb5
FG
77 })
78 }
79}
80
2d13dcfc 81// Helper to get absolute URL for dist-specific relative `path`.
9ecde319
FG
82fn get_dist_url(repo: &APTRepository, path: &str) -> String {
83 let dist_root = format!("{}/dists/{}", repo.uris[0], repo.suites[0]);
84
85 format!("{}/{}", dist_root, path)
86}
87
2d13dcfc 88// Helper to get dist-specific path given a `prefix` (snapshot dir) and relative `path`.
9ecde319
FG
89fn get_dist_path(repo: &APTRepository, prefix: &Path, path: &str) -> PathBuf {
90 let mut base = PathBuf::from(prefix);
91 base.push("dists");
92 base.push(&repo.suites[0]);
93 base.push(path);
94 base
95}
96
2d13dcfc 97// Helper to get generic URL given a `repo` and `path`.
9ecde319
FG
98fn get_repo_url(repo: &APTRepository, path: &str) -> String {
99 format!("{}/{}", repo.uris[0], path)
100}
101
2d13dcfc
FG
102/// Helper to fetch file from URI and optionally verify the responses checksum.
103///
104/// Only fetches and returns data, doesn't store anything anywhere.
9ecde319 105fn fetch_repo_file(
49997188 106 client: &Client,
9ecde319 107 uri: &str,
d7e210ac 108 max_size: usize,
9ecde319 109 checksums: Option<&CheckSums>,
8b267808 110 auth: Option<&str>,
9ecde319
FG
111) -> Result<FetchResult, Error> {
112 println!("-> GET '{}'..", uri);
113
49997188
FG
114 let headers = if let Some(auth) = auth {
115 let mut map = HashMap::new();
116 map.insert("Authorization".to_string(), auth.to_string());
117 Some(map)
8b267808 118 } else {
49997188 119 None
8b267808
FG
120 };
121
49997188 122 let response = client.get(uri, headers.as_ref())?;
9ecde319 123
49997188 124 let reader: Box<dyn Read> = response.into_body();
d7e210ac 125 let mut reader = reader.take(max_size as u64);
9ecde319 126 let mut data = Vec::new();
49997188 127 reader.read_to_end(&mut data)?;
9ecde319
FG
128
129 if let Some(checksums) = checksums {
130 checksums.verify(&data)?;
131 }
132
133 Ok(FetchResult {
49997188 134 fetched: data.len(),
9ecde319 135 data,
9ecde319
FG
136 })
137}
138
2d13dcfc
FG
139/// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository.
140///
141/// Verifies the contained/detached signature, stores all fetched files under `prefix`, and returns the verified raw release file data.
9ecde319
FG
142fn fetch_release(
143 config: &ParsedMirrorConfig,
144 prefix: &Path,
145 detached: bool,
146) -> Result<FetchResult, Error> {
147 let (name, fetched, sig) = if detached {
148 println!("Fetching Release/Release.gpg files");
8b267808 149 let sig = fetch_repo_file(
49997188 150 &config.client,
8b267808 151 &get_dist_url(&config.repository, "Release.gpg"),
d7e210ac 152 1024 * 1024,
8b267808
FG
153 None,
154 config.auth.as_deref(),
155 )?;
9ecde319 156 let mut fetched = fetch_repo_file(
49997188 157 &config.client,
9ecde319 158 &get_dist_url(&config.repository, "Release"),
d7e210ac 159 256 * 1024 * 1024,
9ecde319 160 None,
8b267808 161 config.auth.as_deref(),
9ecde319
FG
162 )?;
163 fetched.fetched += sig.fetched;
164 ("Release(.gpg)", fetched, Some(sig.data()))
165 } else {
166 println!("Fetching InRelease file");
167 let fetched = fetch_repo_file(
49997188 168 &config.client,
9ecde319 169 &get_dist_url(&config.repository, "InRelease"),
d7e210ac 170 256 * 1024 * 1024,
9ecde319 171 None,
8b267808 172 config.auth.as_deref(),
9ecde319
FG
173 )?;
174 ("InRelease", fetched, None)
175 };
176
177 println!("Verifying '{name}' signature using provided repository key..");
178 let content = fetched.data_ref();
179 let verified = helpers::verify_signature(content, &config.key, sig.as_deref())?;
180 println!("Success");
181
182 let sha512 = Some(openssl::sha::sha512(content));
183 let csums = CheckSums {
184 sha512,
185 ..Default::default()
186 };
187
188 let locked = &config.pool.lock()?;
189
190 if !locked.contains(&csums) {
d035ecb5 191 locked.add_file(content, &csums, config.sync)?;
9ecde319
FG
192 }
193
194 if detached {
195 locked.link_file(
196 &csums,
197 Path::new(&get_dist_path(&config.repository, prefix, "Release")),
198 )?;
199 let sig = sig.unwrap();
200 let sha512 = Some(openssl::sha::sha512(&sig));
201 let csums = CheckSums {
202 sha512,
203 ..Default::default()
204 };
205 if !locked.contains(&csums) {
d035ecb5 206 locked.add_file(&sig, &csums, config.sync)?;
9ecde319
FG
207 }
208 locked.link_file(
209 &csums,
210 Path::new(&get_dist_path(&config.repository, prefix, "Release.gpg")),
211 )?;
212 } else {
213 locked.link_file(
214 &csums,
215 Path::new(&get_dist_path(&config.repository, prefix, "InRelease")),
216 )?;
217 }
218
219 Ok(FetchResult {
220 data: verified,
221 fetched: fetched.fetched,
222 })
223}
224
2d13dcfc
FG
225/// Helper to fetch an index file referenced by a `ReleaseFile`.
226///
227/// Since these usually come in compressed and uncompressed form, with the latter often not actually existing in the source repository as file, this fetches and if necessary decompresses to obtain a copy of the uncompressed data.
228/// Will skip fetching if both references are already available with the expected checksum in the pool, in which case they will just be re-linked under the new path.
229///
230/// Returns the uncompressed data.
9ecde319
FG
231fn fetch_index_file(
232 config: &ParsedMirrorConfig,
233 prefix: &Path,
234 reference: &FileReference,
c5fed38d 235 uncompressed: Option<&FileReference>,
8063fd36 236 by_hash: bool,
9ecde319
FG
237) -> Result<FetchResult, Error> {
238 let url = get_dist_url(&config.repository, &reference.path);
239 let path = get_dist_path(&config.repository, prefix, &reference.path);
c5fed38d
FG
240
241 if let Some(uncompressed) = uncompressed {
242 let uncompressed_path = get_dist_path(&config.repository, prefix, &uncompressed.path);
243
244 if config.pool.contains(&reference.checksums)
245 && config.pool.contains(&uncompressed.checksums)
246 {
247 let data = config
248 .pool
249 .get_contents(&uncompressed.checksums, config.verify)?;
250
251 // Ensure they're linked at current path
252 config.pool.lock()?.link_file(&reference.checksums, &path)?;
253 config
254 .pool
255 .lock()?
256 .link_file(&uncompressed.checksums, &uncompressed_path)?;
257 return Ok(FetchResult { data, fetched: 0 });
258 }
9ecde319
FG
259 }
260
8063fd36
FG
261 let urls = if by_hash {
262 let mut urls = Vec::new();
263 if let Some((base_url, _file_name)) = url.rsplit_once('/') {
264 if let Some(sha512) = reference.checksums.sha512 {
265 urls.push(format!("{base_url}/by-hash/SHA512/{}", hex::encode(sha512)));
266 }
267 if let Some(sha256) = reference.checksums.sha256 {
268 urls.push(format!("{base_url}/by-hash/SHA256/{}", hex::encode(sha256)));
269 }
270 }
271 urls.push(url);
272 urls
273 } else {
274 vec![url]
275 };
276
277 let res = urls
278 .iter()
279 .fold(None, |res, url| match res {
280 Some(Ok(res)) => Some(Ok(res)),
281 _ => Some(fetch_plain_file(
282 config,
283 url,
284 &path,
285 reference.size,
286 &reference.checksums,
287 true,
288 )),
289 })
290 .ok_or_else(|| format_err!("Failed to retrieve {}", reference.path))??;
9ecde319
FG
291
292 let mut buf = Vec::new();
293 let raw = res.data_ref();
294
295 let decompressed = match reference.file_type.compression() {
296 None => raw,
297 Some(CompressionType::Gzip) => {
298 let mut gz = GzDecoder::new(raw);
299 gz.read_to_end(&mut buf)?;
300 &buf[..]
301 }
302 Some(CompressionType::Bzip2) => {
303 let mut bz = bzip2::read::BzDecoder::new(raw);
304 bz.read_to_end(&mut buf)?;
305 &buf[..]
306 }
307 Some(CompressionType::Lzma) | Some(CompressionType::Xz) => {
bb1685a0 308 let mut xz = xz2::read::XzDecoder::new_multi_decoder(raw);
9ecde319
FG
309 xz.read_to_end(&mut buf)?;
310 &buf[..]
311 }
312 };
313
314 let locked = &config.pool.lock()?;
c5fed38d
FG
315 if let Some(uncompressed) = uncompressed {
316 if !locked.contains(&uncompressed.checksums) {
317 locked.add_file(decompressed, &uncompressed.checksums, config.sync)?;
318 }
9ecde319 319
c5fed38d
FG
320 // Ensure it's linked at current path
321 let uncompressed_path = get_dist_path(&config.repository, prefix, &uncompressed.path);
322 locked.link_file(&uncompressed.checksums, &uncompressed_path)?;
323 }
9ecde319
FG
324
325 Ok(FetchResult {
326 data: decompressed.to_owned(),
327 fetched: res.fetched,
328 })
329}
330
2d13dcfc
FG
331/// Helper to fetch arbitrary files like binary packages.
332///
333/// Will skip fetching if matching file already exists locally, in which case it will just be re-linked under the new path.
334///
335/// If need_data is false and the mirror config is set to skip verification, reading the file's content will be skipped as well if fetching was skipped.
9ecde319
FG
336fn fetch_plain_file(
337 config: &ParsedMirrorConfig,
338 url: &str,
339 file: &Path,
d7e210ac 340 max_size: usize,
9ecde319
FG
341 checksums: &CheckSums,
342 need_data: bool,
343) -> Result<FetchResult, Error> {
344 let locked = &config.pool.lock()?;
345 let res = if locked.contains(checksums) {
346 if need_data || config.verify {
347 locked
348 .get_contents(checksums, config.verify)
349 .map(|data| FetchResult { data, fetched: 0 })?
350 } else {
351 // performance optimization for .deb files if verify is false
352 // we never need the file contents and they make up the bulk of a repo
353 FetchResult {
354 data: vec![],
355 fetched: 0,
356 }
357 }
358 } else {
8b267808 359 let fetched = fetch_repo_file(
49997188 360 &config.client,
8b267808 361 url,
d7e210ac 362 max_size,
8b267808
FG
363 Some(checksums),
364 config.auth.as_deref(),
365 )?;
9ecde319
FG
366 locked.add_file(fetched.data_ref(), checksums, config.verify)?;
367 fetched
368 };
369
370 // Ensure it's linked at current path
371 locked.link_file(checksums, file)?;
372
373 Ok(res)
374}
375
2d13dcfc 376/// Initialize a new mirror (by creating the corresponding pool).
d035ecb5 377pub fn init(config: &MirrorConfig) -> Result<(), Error> {
c598cb15
FG
378 let pool_dir = format!("{}/.pool", config.base_dir);
379
380 let dir = format!("{}/{}", config.base_dir, config.id);
381
382 Pool::create(Path::new(&dir), Path::new(&pool_dir))?;
d035ecb5
FG
383 Ok(())
384}
385
c598cb15 386/// Destroy a mirror (by destroying the corresponding pool's link dir followed by GC).
d035ecb5
FG
387pub fn destroy(config: &MirrorConfig) -> Result<(), Error> {
388 let pool: Pool = pool(config)?;
389 pool.lock()?.destroy()?;
390
391 Ok(())
392}
393
2d13dcfc 394/// List snapshots
d035ecb5
FG
395pub fn list_snapshots(config: &MirrorConfig) -> Result<Vec<Snapshot>, Error> {
396 let _pool: Pool = pool(config)?;
397
398 let mut list: Vec<Snapshot> = vec![];
399
c598cb15
FG
400 let dir = mirror_dir(config);
401
402 let path = Path::new(&dir);
d035ecb5
FG
403
404 proxmox_sys::fs::scandir(
405 libc::AT_FDCWD,
406 path,
407 &SNAPSHOT_REGEX,
408 |_l2_fd, snapshot, file_type| {
409 if file_type != nix::dir::Type::Directory {
410 return Ok(());
411 }
412
413 list.push(snapshot.parse()?);
414
415 Ok(())
416 },
417 )?;
418
45aa8bea
FG
419 list.sort_unstable();
420
d035ecb5
FG
421 Ok(list)
422}
423
2d13dcfc
FG
424/// Create a new snapshot of the remote repository, fetching and storing files as needed.
425///
426/// Operates in three phases:
427/// - Fetch and verify release files
428/// - Fetch referenced indices according to config
429/// - Fetch binary packages referenced by package indices
430///
431/// Files will be linked in a temporary directory and only renamed to the final, valid snapshot directory at the end. In case of error, leftover `XXX.tmp` directories at the top level of `base_dir` can be safely removed once the next snapshot was successfully created, as they only contain hardlinks.
8b267808
FG
432pub fn create_snapshot(
433 config: MirrorConfig,
434 snapshot: &Snapshot,
435 subscription: Option<SubscriptionKey>,
436) -> Result<(), Error> {
437 let auth = if let Some(product) = &config.use_subscription {
438 match subscription {
439 None => {
440 bail!(
441 "Mirror {} requires a subscription key, but none given.",
442 config.id
443 );
444 }
445 Some(key) if key.product() == *product => {
446 let base64 = base64::encode(format!("{}:{}", key.key, key.server_id));
447 Some(format!("basic {base64}"))
448 }
449 Some(key) => {
450 bail!(
451 "Repository product type '{}' and key product type '{}' don't match.",
452 product,
453 key.product()
454 );
455 }
456 }
457 } else {
458 None
459 };
460
461 let mut config: ParsedMirrorConfig = config.try_into()?;
462 config.auth = auth;
9ecde319
FG
463
464 let prefix = format!("{snapshot}.tmp");
465 let prefix = Path::new(&prefix);
466
467 let mut total_progress = Progress::new();
468
469 let parse_release = |res: FetchResult, name: &str| -> Result<ReleaseFile, Error> {
470 println!("Parsing {name}..");
471 let parsed: ReleaseFile = res.data[..].try_into()?;
472 println!(
473 "'{name}' file has {} referenced files..",
474 parsed.files.len()
475 );
476 Ok(parsed)
477 };
478
479 // we want both on-disk for compat reasons
480 let res = fetch_release(&config, prefix, true)?;
481 total_progress.update(&res);
482 let _release = parse_release(res, "Release")?;
483
484 let res = fetch_release(&config, prefix, false)?;
485 total_progress.update(&res);
486 let release = parse_release(res, "InRelease")?;
487
488 let mut per_component = HashMap::new();
489 let mut others = Vec::new();
490 let binary = &config
491 .repository
492 .types
493 .contains(&APTRepositoryPackageType::Deb);
494 let source = &config
495 .repository
496 .types
497 .contains(&APTRepositoryPackageType::DebSrc);
498
499 for (basename, references) in &release.files {
500 let reference = references.first();
501 let reference = if let Some(reference) = reference {
502 reference.clone()
503 } else {
504 continue;
505 };
506 let skip_components = !&config.repository.components.contains(&reference.component);
507
508 let skip = skip_components
509 || match &reference.file_type {
510 FileReferenceType::Ignored => true,
511 FileReferenceType::PDiff => true, // would require fetching the patches as well
9ecde319 512 FileReferenceType::Sources(_) => !source,
8a876c01
FG
513 _ => {
514 if let Some(arch) = reference.file_type.architecture() {
515 !binary || !config.architectures.contains(arch)
516 } else {
517 false
518 }
519 }
9ecde319
FG
520 };
521 if skip {
522 println!("Skipping {}", reference.path);
523 others.push(reference);
524 } else {
525 let list = per_component
526 .entry(reference.component)
527 .or_insert_with(Vec::new);
528 list.push(basename);
529 }
530 }
531 println!();
532
533 let mut indices_size = 0_usize;
534 let mut total_count = 0;
535
536 for (component, references) in &per_component {
537 println!("Component '{component}'");
538
539 let mut component_indices_size = 0;
540
541 for basename in references {
542 for reference in release.files.get(*basename).unwrap() {
543 println!("\t{:?}: {:?}", reference.path, reference.file_type);
544 component_indices_size += reference.size;
545 }
546 }
547 indices_size += component_indices_size;
548
549 let component_count = references.len();
550 total_count += component_count;
551
552 println!("Component references count: {component_count}");
553 println!("Component indices size: {component_indices_size}");
554 if references.is_empty() {
555 println!("\tNo references found..");
556 }
557 }
558 println!("Total indices count: {total_count}");
559 println!("Total indices size: {indices_size}");
560
561 if !others.is_empty() {
562 println!("Skipped {} references", others.len());
563 }
564 println!();
565
566 let mut packages_size = 0_usize;
567 let mut packages_indices = HashMap::new();
7829ab74 568 let mut failed_references = Vec::new();
9ecde319
FG
569 for (component, references) in per_component {
570 println!("\nFetching indices for component '{component}'");
571 let mut component_deb_size = 0;
572 let mut fetch_progress = Progress::new();
573
574 for basename in references {
575 println!("\tFetching '{basename}'..");
576 let files = release.files.get(basename).unwrap();
c5fed38d
FG
577 let uncompressed_ref = files.iter().find(|reference| reference.path == *basename);
578
9ecde319
FG
579 let mut package_index_data = None;
580
581 for reference in files {
582 // if both compressed and uncompressed are referenced, the uncompressed file may not exist on the server
c5fed38d 583 if Some(reference) == uncompressed_ref && files.len() > 1 {
9ecde319
FG
584 continue;
585 }
586
587 // this will ensure the uncompressed file will be written locally
8063fd36
FG
588 let res = match fetch_index_file(
589 &config,
590 prefix,
591 reference,
592 uncompressed_ref,
593 release.aquire_by_hash,
594 ) {
7829ab74
FG
595 Ok(res) => res,
596 Err(err) if !reference.file_type.is_package_index() => {
597 eprintln!(
598 "Failed to fetch '{:?}' type reference '{}', skipping - {err}",
599 reference.file_type, reference.path
600 );
601 failed_references.push(reference);
602 continue;
603 }
604 Err(err) => bail!(err),
605 };
9ecde319
FG
606 fetch_progress.update(&res);
607
608 if package_index_data.is_none() && reference.file_type.is_package_index() {
609 package_index_data = Some(res.data());
610 }
611 }
612 if let Some(data) = package_index_data {
613 let packages: PackagesFile = data[..].try_into()?;
614 let size: usize = packages.files.iter().map(|p| p.size).sum();
615 println!("\t{} packages totalling {size}", packages.files.len());
616 component_deb_size += size;
617
618 packages_indices.entry(basename).or_insert(packages);
619 }
620 println!("Progress: {fetch_progress}");
621 }
622 println!("Total deb size for component: {component_deb_size}");
623 packages_size += component_deb_size;
624 total_progress += fetch_progress;
625 }
626 println!("Total deb size: {packages_size}");
7829ab74
FG
627 if !failed_references.is_empty() {
628 eprintln!("Failed to download non-package-index references:");
629 for reference in failed_references {
630 eprintln!("\t{}", reference.path);
631 }
632 }
9ecde319
FG
633
634 println!("\nFetching packages..");
635 for (basename, references) in packages_indices {
636 let total_files = references.files.len();
637 if total_files == 0 {
638 println!("\n{basename} - no files, skipping.");
639 continue;
640 } else {
641 println!("\n{basename} - {total_files} total file(s)");
642 }
643
644 let mut fetch_progress = Progress::new();
645 for package in references.files {
646 let mut full_path = PathBuf::from(prefix);
647 full_path.push(&package.file);
648 let res = fetch_plain_file(
649 &config,
650 &get_repo_url(&config.repository, &package.file),
651 &full_path,
d7e210ac 652 package.size,
9ecde319
FG
653 &package.checksums,
654 false,
655 )?;
656 fetch_progress.update(&res);
657 if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
658 println!("\tProgress: {fetch_progress}");
659 }
660 }
661 println!("\tProgress: {fetch_progress}");
662 total_progress += fetch_progress;
663 }
664
665 println!("\nStats: {total_progress}");
666
667 println!("Rotating temp. snapshot in-place: {prefix:?} -> \"{snapshot}\"");
668 let locked = config.pool.lock()?;
669 locked.rename(prefix, Path::new(&format!("{snapshot}")))?;
670
671 Ok(())
672}
d035ecb5 673
2d13dcfc 674/// Remove a snapshot by removing the corresponding snapshot directory. To actually free up space, a garbage collection needs to be run afterwards.
d035ecb5
FG
675pub fn remove_snapshot(config: &MirrorConfig, snapshot: &Snapshot) -> Result<(), Error> {
676 let pool: Pool = pool(config)?;
677 let path = pool.get_path(Path::new(&snapshot.to_string()))?;
678
679 pool.lock()?.remove_dir(&path)
680}
681
2d13dcfc 682/// Run a garbage collection on the underlying pool.
d035ecb5
FG
683pub fn gc(config: &MirrorConfig) -> Result<(usize, u64), Error> {
684 let pool: Pool = pool(config)?;
685
686 pool.lock()?.gc()
687}