]>
Commit | Line | Data |
---|---|---|
9ecde319 FG |
1 | use std::{ |
2 | cmp::max, | |
3 | collections::HashMap, | |
4 | io::Read, | |
5 | path::{Path, PathBuf}, | |
6 | }; | |
7 | ||
8 | use anyhow::{format_err, Error}; | |
9 | use flate2::bufread::GzDecoder; | |
10 | ||
11 | use crate::{config::MirrorConfig, FetchResult, Progress}; | |
12 | use crate::{config::ParsedMirrorConfig, snapshot::Snapshot}; | |
13 | use proxmox_apt::{ | |
14 | deb822::{ | |
15 | CheckSums, CompressionType, FileReference, FileReferenceType, PackagesFile, ReleaseFile, | |
16 | }, | |
17 | repositories::{APTRepository, APTRepositoryPackageType}, | |
18 | }; | |
19 | ||
20 | use crate::helpers; | |
21 | ||
22 | fn get_dist_url(repo: &APTRepository, path: &str) -> String { | |
23 | let dist_root = format!("{}/dists/{}", repo.uris[0], repo.suites[0]); | |
24 | ||
25 | format!("{}/{}", dist_root, path) | |
26 | } | |
27 | ||
28 | fn get_dist_path(repo: &APTRepository, prefix: &Path, path: &str) -> PathBuf { | |
29 | let mut base = PathBuf::from(prefix); | |
30 | base.push("dists"); | |
31 | base.push(&repo.suites[0]); | |
32 | base.push(path); | |
33 | base | |
34 | } | |
35 | ||
36 | fn get_repo_url(repo: &APTRepository, path: &str) -> String { | |
37 | format!("{}/{}", repo.uris[0], path) | |
38 | } | |
39 | ||
40 | fn fetch_repo_file( | |
41 | uri: &str, | |
42 | max_size: Option<u64>, | |
43 | checksums: Option<&CheckSums>, | |
44 | ) -> Result<FetchResult, Error> { | |
45 | println!("-> GET '{}'..", uri); | |
46 | ||
47 | let response = ureq::get(uri).call()?.into_reader(); | |
48 | ||
49 | let mut data = Vec::new(); | |
50 | let bytes = response | |
51 | .take(max_size.unwrap_or(10_000_000)) | |
52 | .read_to_end(&mut data)?; | |
53 | ||
54 | if let Some(checksums) = checksums { | |
55 | checksums.verify(&data)?; | |
56 | } | |
57 | ||
58 | Ok(FetchResult { | |
59 | data, | |
60 | fetched: bytes, | |
61 | }) | |
62 | } | |
63 | ||
64 | fn fetch_release( | |
65 | config: &ParsedMirrorConfig, | |
66 | prefix: &Path, | |
67 | detached: bool, | |
68 | ) -> Result<FetchResult, Error> { | |
69 | let (name, fetched, sig) = if detached { | |
70 | println!("Fetching Release/Release.gpg files"); | |
71 | let sig = fetch_repo_file(&get_dist_url(&config.repository, "Release.gpg"), None, None)?; | |
72 | let mut fetched = fetch_repo_file( | |
73 | &get_dist_url(&config.repository, "Release"), | |
74 | Some(32_000_000), | |
75 | None, | |
76 | )?; | |
77 | fetched.fetched += sig.fetched; | |
78 | ("Release(.gpg)", fetched, Some(sig.data())) | |
79 | } else { | |
80 | println!("Fetching InRelease file"); | |
81 | let fetched = fetch_repo_file( | |
82 | &get_dist_url(&config.repository, "InRelease"), | |
83 | Some(32_000_000), | |
84 | None, | |
85 | )?; | |
86 | ("InRelease", fetched, None) | |
87 | }; | |
88 | ||
89 | println!("Verifying '{name}' signature using provided repository key.."); | |
90 | let content = fetched.data_ref(); | |
91 | let verified = helpers::verify_signature(content, &config.key, sig.as_deref())?; | |
92 | println!("Success"); | |
93 | ||
94 | let sha512 = Some(openssl::sha::sha512(content)); | |
95 | let csums = CheckSums { | |
96 | sha512, | |
97 | ..Default::default() | |
98 | }; | |
99 | ||
100 | let locked = &config.pool.lock()?; | |
101 | ||
102 | if !locked.contains(&csums) { | |
103 | locked.add_file(content, &csums, true)?; | |
104 | } | |
105 | ||
106 | if detached { | |
107 | locked.link_file( | |
108 | &csums, | |
109 | Path::new(&get_dist_path(&config.repository, prefix, "Release")), | |
110 | )?; | |
111 | let sig = sig.unwrap(); | |
112 | let sha512 = Some(openssl::sha::sha512(&sig)); | |
113 | let csums = CheckSums { | |
114 | sha512, | |
115 | ..Default::default() | |
116 | }; | |
117 | if !locked.contains(&csums) { | |
118 | locked.add_file(&sig, &csums, true)?; | |
119 | } | |
120 | locked.link_file( | |
121 | &csums, | |
122 | Path::new(&get_dist_path(&config.repository, prefix, "Release.gpg")), | |
123 | )?; | |
124 | } else { | |
125 | locked.link_file( | |
126 | &csums, | |
127 | Path::new(&get_dist_path(&config.repository, prefix, "InRelease")), | |
128 | )?; | |
129 | } | |
130 | ||
131 | Ok(FetchResult { | |
132 | data: verified, | |
133 | fetched: fetched.fetched, | |
134 | }) | |
135 | } | |
136 | ||
137 | fn fetch_index_file( | |
138 | config: &ParsedMirrorConfig, | |
139 | prefix: &Path, | |
140 | reference: &FileReference, | |
141 | uncompressed: &FileReference, | |
142 | ) -> Result<FetchResult, Error> { | |
143 | let url = get_dist_url(&config.repository, &reference.path); | |
144 | let path = get_dist_path(&config.repository, prefix, &reference.path); | |
145 | let uncompressed_path = get_dist_path(&config.repository, prefix, &uncompressed.path); | |
146 | ||
147 | if config.pool.contains(&reference.checksums) && config.pool.contains(&uncompressed.checksums) { | |
148 | let data = config | |
149 | .pool | |
150 | .get_contents(&uncompressed.checksums, config.verify)?; | |
151 | ||
152 | // Ensure they're linked at current path | |
153 | config.pool.lock()?.link_file(&reference.checksums, &path)?; | |
154 | config | |
155 | .pool | |
156 | .lock()? | |
157 | .link_file(&uncompressed.checksums, &uncompressed_path)?; | |
158 | return Ok(FetchResult { data, fetched: 0 }); | |
159 | } | |
160 | ||
161 | let res = fetch_plain_file(config, &url, &path, &reference.checksums, true)?; | |
162 | ||
163 | let mut buf = Vec::new(); | |
164 | let raw = res.data_ref(); | |
165 | ||
166 | let decompressed = match reference.file_type.compression() { | |
167 | None => raw, | |
168 | Some(CompressionType::Gzip) => { | |
169 | let mut gz = GzDecoder::new(raw); | |
170 | gz.read_to_end(&mut buf)?; | |
171 | &buf[..] | |
172 | } | |
173 | Some(CompressionType::Bzip2) => { | |
174 | let mut bz = bzip2::read::BzDecoder::new(raw); | |
175 | bz.read_to_end(&mut buf)?; | |
176 | &buf[..] | |
177 | } | |
178 | Some(CompressionType::Lzma) | Some(CompressionType::Xz) => { | |
179 | let mut xz = xz2::read::XzDecoder::new(raw); | |
180 | xz.read_to_end(&mut buf)?; | |
181 | &buf[..] | |
182 | } | |
183 | }; | |
184 | ||
185 | let locked = &config.pool.lock()?; | |
186 | if !locked.contains(&uncompressed.checksums) { | |
187 | locked.add_file(decompressed, &uncompressed.checksums, true)?; | |
188 | } | |
189 | ||
190 | // Ensure it's linked at current path | |
191 | locked.link_file(&uncompressed.checksums, &uncompressed_path)?; | |
192 | ||
193 | Ok(FetchResult { | |
194 | data: decompressed.to_owned(), | |
195 | fetched: res.fetched, | |
196 | }) | |
197 | } | |
198 | ||
199 | fn fetch_plain_file( | |
200 | config: &ParsedMirrorConfig, | |
201 | url: &str, | |
202 | file: &Path, | |
203 | checksums: &CheckSums, | |
204 | need_data: bool, | |
205 | ) -> Result<FetchResult, Error> { | |
206 | let locked = &config.pool.lock()?; | |
207 | let res = if locked.contains(checksums) { | |
208 | if need_data || config.verify { | |
209 | locked | |
210 | .get_contents(checksums, config.verify) | |
211 | .map(|data| FetchResult { data, fetched: 0 })? | |
212 | } else { | |
213 | // performance optimization for .deb files if verify is false | |
214 | // we never need the file contents and they make up the bulk of a repo | |
215 | FetchResult { | |
216 | data: vec![], | |
217 | fetched: 0, | |
218 | } | |
219 | } | |
220 | } else { | |
221 | let fetched = fetch_repo_file(url, Some(5_000_000_000), Some(checksums))?; | |
222 | locked.add_file(fetched.data_ref(), checksums, config.verify)?; | |
223 | fetched | |
224 | }; | |
225 | ||
226 | // Ensure it's linked at current path | |
227 | locked.link_file(checksums, file)?; | |
228 | ||
229 | Ok(res) | |
230 | } | |
231 | ||
232 | pub fn mirror(config: MirrorConfig) -> Result<(), Error> { | |
233 | let config: ParsedMirrorConfig = config.try_into()?; | |
234 | let snapshot = Snapshot::now(); | |
235 | ||
236 | let prefix = format!("{snapshot}.tmp"); | |
237 | let prefix = Path::new(&prefix); | |
238 | ||
239 | let mut total_progress = Progress::new(); | |
240 | ||
241 | let parse_release = |res: FetchResult, name: &str| -> Result<ReleaseFile, Error> { | |
242 | println!("Parsing {name}.."); | |
243 | let parsed: ReleaseFile = res.data[..].try_into()?; | |
244 | println!( | |
245 | "'{name}' file has {} referenced files..", | |
246 | parsed.files.len() | |
247 | ); | |
248 | Ok(parsed) | |
249 | }; | |
250 | ||
251 | // we want both on-disk for compat reasons | |
252 | let res = fetch_release(&config, prefix, true)?; | |
253 | total_progress.update(&res); | |
254 | let _release = parse_release(res, "Release")?; | |
255 | ||
256 | let res = fetch_release(&config, prefix, false)?; | |
257 | total_progress.update(&res); | |
258 | let release = parse_release(res, "InRelease")?; | |
259 | ||
260 | let mut per_component = HashMap::new(); | |
261 | let mut others = Vec::new(); | |
262 | let binary = &config | |
263 | .repository | |
264 | .types | |
265 | .contains(&APTRepositoryPackageType::Deb); | |
266 | let source = &config | |
267 | .repository | |
268 | .types | |
269 | .contains(&APTRepositoryPackageType::DebSrc); | |
270 | ||
271 | for (basename, references) in &release.files { | |
272 | let reference = references.first(); | |
273 | let reference = if let Some(reference) = reference { | |
274 | reference.clone() | |
275 | } else { | |
276 | continue; | |
277 | }; | |
278 | let skip_components = !&config.repository.components.contains(&reference.component); | |
279 | ||
280 | let skip = skip_components | |
281 | || match &reference.file_type { | |
282 | FileReferenceType::Ignored => true, | |
283 | FileReferenceType::PDiff => true, // would require fetching the patches as well | |
284 | FileReferenceType::Contents(arch, _) | |
285 | | FileReferenceType::ContentsUdeb(arch, _) | |
286 | | FileReferenceType::Packages(arch, _) | |
287 | | FileReferenceType::PseudoRelease(Some(arch)) => { | |
288 | !binary || !config.architectures.contains(arch) | |
289 | } | |
290 | FileReferenceType::Sources(_) => !source, | |
291 | _ => false, | |
292 | }; | |
293 | if skip { | |
294 | println!("Skipping {}", reference.path); | |
295 | others.push(reference); | |
296 | } else { | |
297 | let list = per_component | |
298 | .entry(reference.component) | |
299 | .or_insert_with(Vec::new); | |
300 | list.push(basename); | |
301 | } | |
302 | } | |
303 | println!(); | |
304 | ||
305 | let mut indices_size = 0_usize; | |
306 | let mut total_count = 0; | |
307 | ||
308 | for (component, references) in &per_component { | |
309 | println!("Component '{component}'"); | |
310 | ||
311 | let mut component_indices_size = 0; | |
312 | ||
313 | for basename in references { | |
314 | for reference in release.files.get(*basename).unwrap() { | |
315 | println!("\t{:?}: {:?}", reference.path, reference.file_type); | |
316 | component_indices_size += reference.size; | |
317 | } | |
318 | } | |
319 | indices_size += component_indices_size; | |
320 | ||
321 | let component_count = references.len(); | |
322 | total_count += component_count; | |
323 | ||
324 | println!("Component references count: {component_count}"); | |
325 | println!("Component indices size: {component_indices_size}"); | |
326 | if references.is_empty() { | |
327 | println!("\tNo references found.."); | |
328 | } | |
329 | } | |
330 | println!("Total indices count: {total_count}"); | |
331 | println!("Total indices size: {indices_size}"); | |
332 | ||
333 | if !others.is_empty() { | |
334 | println!("Skipped {} references", others.len()); | |
335 | } | |
336 | println!(); | |
337 | ||
338 | let mut packages_size = 0_usize; | |
339 | let mut packages_indices = HashMap::new(); | |
340 | for (component, references) in per_component { | |
341 | println!("\nFetching indices for component '{component}'"); | |
342 | let mut component_deb_size = 0; | |
343 | let mut fetch_progress = Progress::new(); | |
344 | ||
345 | for basename in references { | |
346 | println!("\tFetching '{basename}'.."); | |
347 | let files = release.files.get(basename).unwrap(); | |
348 | let uncompressed_ref = files | |
349 | .iter() | |
350 | .find(|reference| reference.path == *basename) | |
351 | .ok_or_else(|| format_err!("Found derived reference without base reference."))?; | |
352 | let mut package_index_data = None; | |
353 | ||
354 | for reference in files { | |
355 | // if both compressed and uncompressed are referenced, the uncompressed file may not exist on the server | |
356 | if reference == uncompressed_ref && files.len() > 1 { | |
357 | continue; | |
358 | } | |
359 | ||
360 | // this will ensure the uncompressed file will be written locally | |
361 | let res = fetch_index_file(&config, prefix, reference, uncompressed_ref)?; | |
362 | fetch_progress.update(&res); | |
363 | ||
364 | if package_index_data.is_none() && reference.file_type.is_package_index() { | |
365 | package_index_data = Some(res.data()); | |
366 | } | |
367 | } | |
368 | if let Some(data) = package_index_data { | |
369 | let packages: PackagesFile = data[..].try_into()?; | |
370 | let size: usize = packages.files.iter().map(|p| p.size).sum(); | |
371 | println!("\t{} packages totalling {size}", packages.files.len()); | |
372 | component_deb_size += size; | |
373 | ||
374 | packages_indices.entry(basename).or_insert(packages); | |
375 | } | |
376 | println!("Progress: {fetch_progress}"); | |
377 | } | |
378 | println!("Total deb size for component: {component_deb_size}"); | |
379 | packages_size += component_deb_size; | |
380 | total_progress += fetch_progress; | |
381 | } | |
382 | println!("Total deb size: {packages_size}"); | |
383 | ||
384 | println!("\nFetching packages.."); | |
385 | for (basename, references) in packages_indices { | |
386 | let total_files = references.files.len(); | |
387 | if total_files == 0 { | |
388 | println!("\n{basename} - no files, skipping."); | |
389 | continue; | |
390 | } else { | |
391 | println!("\n{basename} - {total_files} total file(s)"); | |
392 | } | |
393 | ||
394 | let mut fetch_progress = Progress::new(); | |
395 | for package in references.files { | |
396 | let mut full_path = PathBuf::from(prefix); | |
397 | full_path.push(&package.file); | |
398 | let res = fetch_plain_file( | |
399 | &config, | |
400 | &get_repo_url(&config.repository, &package.file), | |
401 | &full_path, | |
402 | &package.checksums, | |
403 | false, | |
404 | )?; | |
405 | fetch_progress.update(&res); | |
406 | if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 { | |
407 | println!("\tProgress: {fetch_progress}"); | |
408 | } | |
409 | } | |
410 | println!("\tProgress: {fetch_progress}"); | |
411 | total_progress += fetch_progress; | |
412 | } | |
413 | ||
414 | println!("\nStats: {total_progress}"); | |
415 | ||
416 | println!("Rotating temp. snapshot in-place: {prefix:?} -> \"{snapshot}\""); | |
417 | let locked = config.pool.lock()?; | |
418 | locked.rename(prefix, Path::new(&format!("{snapshot}")))?; | |
419 | ||
420 | Ok(()) | |
421 | } |