]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/tape/backup.rs
move src/tape/helpers/snapshot_reader.rs to src/backup/snapshot_reader.rs
[proxmox-backup.git] / src / api2 / tape / backup.rs
1 use std::path::Path;
2 use std::sync::{Mutex, Arc};
3
4 use anyhow::{bail, format_err, Error};
5 use serde_json::Value;
6
7 use proxmox::{
8 try_block,
9 api::{
10 api,
11 RpcEnvironment,
12 RpcEnvironmentType,
13 Router,
14 Permission,
15 },
16 };
17
18 use pbs_api_types::{
19 Authid, Userid, TapeBackupJobConfig, TapeBackupJobSetup, TapeBackupJobStatus, MediaPoolConfig,
20 UPID_SCHEMA, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE,
21 };
22
23 use pbs_datastore::{task_log, task_warn, StoreProgress};
24 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
25 use pbs_datastore::task::TaskState;
26 use pbs_config::CachedUserInfo;
27
28 use crate::{
29 server::{
30 lookup_user_email,
31 TapeBackupJobSummary,
32 jobstate::{
33 Job,
34 JobState,
35 compute_schedule_status,
36 },
37 },
38 backup::{DataStore, SnapshotReader},
39 server::WorkerTask,
40 tape::{
41 TAPE_STATUS_DIR,
42 Inventory,
43 PoolWriter,
44 MediaPool,
45 drive::{
46 media_changer,
47 lock_tape_device,
48 TapeLockError,
49 set_tape_device_state,
50 },
51 changer::update_changer_online_status,
52 },
53 };
54
55 const TAPE_BACKUP_JOB_ROUTER: Router = Router::new()
56 .post(&API_METHOD_RUN_TAPE_BACKUP_JOB);
57
58 pub const ROUTER: Router = Router::new()
59 .get(&API_METHOD_LIST_TAPE_BACKUP_JOBS)
60 .post(&API_METHOD_BACKUP)
61 .match_all("id", &TAPE_BACKUP_JOB_ROUTER);
62
63 fn check_backup_permission(
64 auth_id: &Authid,
65 store: &str,
66 pool: &str,
67 drive: &str,
68 ) -> Result<(), Error> {
69
70 let user_info = CachedUserInfo::new()?;
71
72 let privs = user_info.lookup_privs(auth_id, &["datastore", store]);
73 if (privs & PRIV_DATASTORE_READ) == 0 {
74 bail!("no permissions on /datastore/{}", store);
75 }
76
77 let privs = user_info.lookup_privs(auth_id, &["tape", "drive", drive]);
78 if (privs & PRIV_TAPE_WRITE) == 0 {
79 bail!("no permissions on /tape/drive/{}", drive);
80 }
81
82 let privs = user_info.lookup_privs(auth_id, &["tape", "pool", pool]);
83 if (privs & PRIV_TAPE_WRITE) == 0 {
84 bail!("no permissions on /tape/pool/{}", pool);
85 }
86
87 Ok(())
88 }
89
90 #[api(
91 returns: {
92 description: "List configured thape backup jobs and their status",
93 type: Array,
94 items: { type: TapeBackupJobStatus },
95 },
96 access: {
97 description: "List configured tape jobs filtered by Tape.Audit privileges",
98 permission: &Permission::Anybody,
99 },
100 )]
101 /// List all tape backup jobs
102 pub fn list_tape_backup_jobs(
103 _param: Value,
104 mut rpcenv: &mut dyn RpcEnvironment,
105 ) -> Result<Vec<TapeBackupJobStatus>, Error> {
106 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
107 let user_info = CachedUserInfo::new()?;
108
109 let (job_config, digest) = pbs_config::tape_job::config()?;
110 let (pool_config, _pool_digest) = pbs_config::media_pool::config()?;
111 let (drive_config, _digest) = pbs_config::drive::config()?;
112
113 let job_list_iter = job_config
114 .convert_to_typed_array("backup")?
115 .into_iter()
116 .filter(|_job: &TapeBackupJobConfig| {
117 // fixme: check access permission
118 true
119 });
120
121 let mut list = Vec::new();
122 let status_path = Path::new(TAPE_STATUS_DIR);
123 let current_time = proxmox::tools::time::epoch_i64();
124
125 for job in job_list_iter {
126 let privs = user_info.lookup_privs(&auth_id, &["tape", "job", &job.id]);
127 if (privs & PRIV_TAPE_AUDIT) == 0 {
128 continue;
129 }
130
131 let last_state = JobState::load("tape-backup-job", &job.id)
132 .map_err(|err| format_err!("could not open statefile for {}: {}", &job.id, err))?;
133
134 let status = compute_schedule_status(&last_state, job.schedule.as_deref())?;
135
136 let next_run = status.next_run.unwrap_or(current_time);
137
138 let mut next_media_label = None;
139
140 if let Ok(pool) = pool_config.lookup::<MediaPoolConfig>("pool", &job.setup.pool) {
141 let mut changer_name = None;
142 if let Ok(Some((_, name))) = media_changer(&drive_config, &job.setup.drive) {
143 changer_name = Some(name);
144 }
145 if let Ok(mut pool) = MediaPool::with_config(status_path, &pool, changer_name, true) {
146 if pool.start_write_session(next_run, false).is_ok() {
147 if let Ok(media_id) = pool.guess_next_writable_media(next_run) {
148 next_media_label = Some(media_id.label.label_text);
149 }
150 }
151 }
152 }
153
154 list.push(TapeBackupJobStatus { config: job, status, next_media_label });
155 }
156
157 rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
158
159 Ok(list)
160 }
161
162 pub fn do_tape_backup_job(
163 mut job: Job,
164 setup: TapeBackupJobSetup,
165 auth_id: &Authid,
166 schedule: Option<String>,
167 ) -> Result<String, Error> {
168
169 let job_id = format!("{}:{}:{}:{}",
170 setup.store,
171 setup.pool,
172 setup.drive,
173 job.jobname());
174
175 let worker_type = job.jobtype().to_string();
176
177 let datastore = DataStore::lookup_datastore(&setup.store)?;
178
179 let (config, _digest) = pbs_config::media_pool::config()?;
180 let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
181
182 let (drive_config, _digest) = pbs_config::drive::config()?;
183
184 // for scheduled jobs we acquire the lock later in the worker
185 let drive_lock = if schedule.is_some() {
186 None
187 } else {
188 Some(lock_tape_device(&drive_config, &setup.drive)?)
189 };
190
191 let notify_user = setup.notify_user.as_ref().unwrap_or_else(|| &Userid::root_userid());
192 let email = lookup_user_email(notify_user);
193
194 let upid_str = WorkerTask::new_thread(
195 &worker_type,
196 Some(job_id.clone()),
197 auth_id.clone(),
198 false,
199 move |worker| {
200 job.start(&worker.upid().to_string())?;
201 let mut drive_lock = drive_lock;
202
203 let mut summary = Default::default();
204 let job_result = try_block!({
205 if schedule.is_some() {
206 // for scheduled tape backup jobs, we wait indefinitely for the lock
207 task_log!(worker, "waiting for drive lock...");
208 loop {
209 worker.check_abort()?;
210 match lock_tape_device(&drive_config, &setup.drive) {
211 Ok(lock) => {
212 drive_lock = Some(lock);
213 break;
214 }
215 Err(TapeLockError::TimeOut) => continue,
216 Err(TapeLockError::Other(err)) => return Err(err),
217 }
218 }
219 }
220 set_tape_device_state(&setup.drive, &worker.upid().to_string())?;
221
222 task_log!(worker,"Starting tape backup job '{}'", job_id);
223 if let Some(event_str) = schedule {
224 task_log!(worker,"task triggered by schedule '{}'", event_str);
225 }
226
227
228 backup_worker(
229 &worker,
230 datastore,
231 &pool_config,
232 &setup,
233 email.clone(),
234 &mut summary,
235 false,
236 )
237 });
238
239 let status = worker.create_state(&job_result);
240
241 if let Some(email) = email {
242 if let Err(err) = crate::server::send_tape_backup_status(
243 &email,
244 Some(job.jobname()),
245 &setup,
246 &job_result,
247 summary,
248 ) {
249 eprintln!("send tape backup notification failed: {}", err);
250 }
251 }
252
253 if let Err(err) = job.finish(status) {
254 eprintln!(
255 "could not finish job state for {}: {}",
256 job.jobtype().to_string(),
257 err
258 );
259 }
260
261 if let Err(err) = set_tape_device_state(&setup.drive, "") {
262 eprintln!(
263 "could not unset drive state for {}: {}",
264 setup.drive,
265 err
266 );
267 }
268
269 job_result
270 }
271 )?;
272
273 Ok(upid_str)
274 }
275
276 #[api(
277 input: {
278 properties: {
279 id: {
280 schema: JOB_ID_SCHEMA,
281 },
282 },
283 },
284 access: {
285 // Note: parameters are from job config, so we need to test inside function body
286 description: "The user needs Tape.Write privilege on /tape/pool/{pool} \
287 and /tape/drive/{drive}, Datastore.Read privilege on /datastore/{store}.",
288 permission: &Permission::Anybody,
289 },
290 )]
291 /// Runs a tape backup job manually.
292 pub fn run_tape_backup_job(
293 id: String,
294 rpcenv: &mut dyn RpcEnvironment,
295 ) -> Result<String, Error> {
296 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
297
298 let (config, _digest) = pbs_config::tape_job::config()?;
299 let backup_job: TapeBackupJobConfig = config.lookup("backup", &id)?;
300
301 check_backup_permission(
302 &auth_id,
303 &backup_job.setup.store,
304 &backup_job.setup.pool,
305 &backup_job.setup.drive,
306 )?;
307
308 let job = Job::new("tape-backup-job", &id)?;
309
310 let upid_str = do_tape_backup_job(job, backup_job.setup, &auth_id, None)?;
311
312 Ok(upid_str)
313 }
314
315 #[api(
316 input: {
317 properties: {
318 setup: {
319 type: TapeBackupJobSetup,
320 flatten: true,
321 },
322 "force-media-set": {
323 description: "Ignore the allocation policy and start a new media-set.",
324 optional: true,
325 type: bool,
326 default: false,
327 },
328 },
329 },
330 returns: {
331 schema: UPID_SCHEMA,
332 },
333 access: {
334 // Note: parameters are no uri parameter, so we need to test inside function body
335 description: "The user needs Tape.Write privilege on /tape/pool/{pool} \
336 and /tape/drive/{drive}, Datastore.Read privilege on /datastore/{store}.",
337 permission: &Permission::Anybody,
338 },
339 )]
340 /// Backup datastore to tape media pool
341 pub fn backup(
342 setup: TapeBackupJobSetup,
343 force_media_set: bool,
344 rpcenv: &mut dyn RpcEnvironment,
345 ) -> Result<Value, Error> {
346
347 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
348
349 check_backup_permission(
350 &auth_id,
351 &setup.store,
352 &setup.pool,
353 &setup.drive,
354 )?;
355
356 let datastore = DataStore::lookup_datastore(&setup.store)?;
357
358 let (config, _digest) = pbs_config::media_pool::config()?;
359 let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
360
361 let (drive_config, _digest) = pbs_config::drive::config()?;
362
363 // early check/lock before starting worker
364 let drive_lock = lock_tape_device(&drive_config, &setup.drive)?;
365
366 let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
367
368 let job_id = format!("{}:{}:{}", setup.store, setup.pool, setup.drive);
369
370 let notify_user = setup.notify_user.as_ref().unwrap_or_else(|| &Userid::root_userid());
371 let email = lookup_user_email(notify_user);
372
373 let upid_str = WorkerTask::new_thread(
374 "tape-backup",
375 Some(job_id),
376 auth_id,
377 to_stdout,
378 move |worker| {
379 let _drive_lock = drive_lock; // keep lock guard
380 set_tape_device_state(&setup.drive, &worker.upid().to_string())?;
381
382 let mut summary = Default::default();
383 let job_result = backup_worker(
384 &worker,
385 datastore,
386 &pool_config,
387 &setup,
388 email.clone(),
389 &mut summary,
390 force_media_set,
391 );
392
393 if let Some(email) = email {
394 if let Err(err) = crate::server::send_tape_backup_status(
395 &email,
396 None,
397 &setup,
398 &job_result,
399 summary,
400 ) {
401 eprintln!("send tape backup notification failed: {}", err);
402 }
403 }
404
405 // ignore errors
406 let _ = set_tape_device_state(&setup.drive, "");
407 job_result
408 }
409 )?;
410
411 Ok(upid_str.into())
412 }
413
414 fn backup_worker(
415 worker: &WorkerTask,
416 datastore: Arc<DataStore>,
417 pool_config: &MediaPoolConfig,
418 setup: &TapeBackupJobSetup,
419 email: Option<String>,
420 summary: &mut TapeBackupJobSummary,
421 force_media_set: bool,
422 ) -> Result<(), Error> {
423
424 let status_path = Path::new(TAPE_STATUS_DIR);
425 let start = std::time::Instant::now();
426
427 task_log!(worker, "update media online status");
428 let changer_name = update_media_online_status(&setup.drive)?;
429
430 let pool = MediaPool::with_config(status_path, &pool_config, changer_name, false)?;
431
432 let mut pool_writer = PoolWriter::new(
433 pool,
434 &setup.drive,
435 worker,
436 email,
437 force_media_set
438 )?;
439
440 let mut group_list = BackupInfo::list_backup_groups(&datastore.base_path())?;
441
442 group_list.sort_unstable();
443
444 let group_count = group_list.len();
445 task_log!(worker, "found {} groups", group_count);
446
447 let mut progress = StoreProgress::new(group_count as u64);
448
449 let latest_only = setup.latest_only.unwrap_or(false);
450
451 if latest_only {
452 task_log!(worker, "latest-only: true (only considering latest snapshots)");
453 }
454
455 let datastore_name = datastore.name();
456
457 let mut errors = false;
458
459 let mut need_catalog = false; // avoid writing catalog for empty jobs
460
461 for (group_number, group) in group_list.into_iter().enumerate() {
462 progress.done_groups = group_number as u64;
463 progress.done_snapshots = 0;
464 progress.group_snapshots = 0;
465
466 let snapshot_list = group.list_backups(&datastore.base_path())?;
467
468 // filter out unfinished backups
469 let mut snapshot_list: Vec<_> = snapshot_list
470 .into_iter()
471 .filter(|item| item.is_finished())
472 .collect();
473
474 if snapshot_list.is_empty() {
475 task_log!(worker, "group {} was empty", group);
476 continue;
477 }
478
479 BackupInfo::sort_list(&mut snapshot_list, true); // oldest first
480
481 if latest_only {
482 progress.group_snapshots = 1;
483 if let Some(info) = snapshot_list.pop() {
484 if pool_writer.contains_snapshot(datastore_name, &info.backup_dir.to_string()) {
485 task_log!(worker, "skip snapshot {}", info.backup_dir);
486 continue;
487 }
488
489 need_catalog = true;
490
491 let snapshot_name = info.backup_dir.to_string();
492 if !backup_snapshot(worker, &mut pool_writer, datastore.clone(), info.backup_dir)? {
493 errors = true;
494 } else {
495 summary.snapshot_list.push(snapshot_name);
496 }
497 progress.done_snapshots = 1;
498 task_log!(
499 worker,
500 "percentage done: {}",
501 progress
502 );
503 }
504 } else {
505 progress.group_snapshots = snapshot_list.len() as u64;
506 for (snapshot_number, info) in snapshot_list.into_iter().enumerate() {
507 if pool_writer.contains_snapshot(datastore_name, &info.backup_dir.to_string()) {
508 task_log!(worker, "skip snapshot {}", info.backup_dir);
509 continue;
510 }
511
512 need_catalog = true;
513
514 let snapshot_name = info.backup_dir.to_string();
515 if !backup_snapshot(worker, &mut pool_writer, datastore.clone(), info.backup_dir)? {
516 errors = true;
517 } else {
518 summary.snapshot_list.push(snapshot_name);
519 }
520 progress.done_snapshots = snapshot_number as u64 + 1;
521 task_log!(
522 worker,
523 "percentage done: {}",
524 progress
525 );
526 }
527 }
528 }
529
530 pool_writer.commit()?;
531
532 if need_catalog {
533 task_log!(worker, "append media catalog");
534
535 let uuid = pool_writer.load_writable_media(worker)?;
536 let done = pool_writer.append_catalog_archive(worker)?;
537 if !done {
538 task_log!(worker, "catalog does not fit on tape, writing to next volume");
539 pool_writer.set_media_status_full(&uuid)?;
540 pool_writer.load_writable_media(worker)?;
541 let done = pool_writer.append_catalog_archive(worker)?;
542 if !done {
543 bail!("write_catalog_archive failed on second media");
544 }
545 }
546 }
547
548 if setup.export_media_set.unwrap_or(false) {
549 pool_writer.export_media_set(worker)?;
550 } else if setup.eject_media.unwrap_or(false) {
551 pool_writer.eject_media(worker)?;
552 }
553
554 if errors {
555 bail!("Tape backup finished with some errors. Please check the task log.");
556 }
557
558 summary.duration = start.elapsed();
559
560 Ok(())
561 }
562
563 // Try to update the the media online status
564 fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
565
566 let (config, _digest) = pbs_config::drive::config()?;
567
568 if let Ok(Some((mut changer, changer_name))) = media_changer(&config, drive) {
569
570 let label_text_list = changer.online_media_label_texts()?;
571
572 let status_path = Path::new(TAPE_STATUS_DIR);
573 let mut inventory = Inventory::load(status_path)?;
574
575 update_changer_online_status(
576 &config,
577 &mut inventory,
578 &changer_name,
579 &label_text_list,
580 )?;
581
582 Ok(Some(changer_name))
583 } else {
584 Ok(None)
585 }
586 }
587
588 pub fn backup_snapshot(
589 worker: &WorkerTask,
590 pool_writer: &mut PoolWriter,
591 datastore: Arc<DataStore>,
592 snapshot: BackupDir,
593 ) -> Result<bool, Error> {
594
595 task_log!(worker, "backup snapshot {}", snapshot);
596
597 let snapshot_reader = match SnapshotReader::new(datastore.clone(), snapshot.clone()) {
598 Ok(reader) => reader,
599 Err(err) => {
600 // ignore missing snapshots and continue
601 task_warn!(worker, "failed opening snapshot '{}': {}", snapshot, err);
602 return Ok(false);
603 }
604 };
605
606 let snapshot_reader = Arc::new(Mutex::new(snapshot_reader));
607
608 let (reader_thread, chunk_iter) = pool_writer.spawn_chunk_reader_thread(
609 datastore.clone(),
610 snapshot_reader.clone(),
611 )?;
612
613 let mut chunk_iter = chunk_iter.peekable();
614
615 loop {
616 worker.check_abort()?;
617
618 // test is we have remaining chunks
619 match chunk_iter.peek() {
620 None => break,
621 Some(Ok(_)) => { /* Ok */ },
622 Some(Err(err)) => bail!("{}", err),
623 }
624
625 let uuid = pool_writer.load_writable_media(worker)?;
626
627 worker.check_abort()?;
628
629 let (leom, _bytes) = pool_writer.append_chunk_archive(worker, &mut chunk_iter, datastore.name())?;
630
631 if leom {
632 pool_writer.set_media_status_full(&uuid)?;
633 }
634 }
635
636 if let Err(_) = reader_thread.join() {
637 bail!("chunk reader thread failed");
638 }
639
640 worker.check_abort()?;
641
642 let uuid = pool_writer.load_writable_media(worker)?;
643
644 worker.check_abort()?;
645
646 let snapshot_reader = snapshot_reader.lock().unwrap();
647
648 let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?;
649
650 if !done {
651 // does not fit on tape, so we try on next volume
652 pool_writer.set_media_status_full(&uuid)?;
653
654 worker.check_abort()?;
655
656 pool_writer.load_writable_media(worker)?;
657 let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?;
658
659 if !done {
660 bail!("write_snapshot_archive failed on second media");
661 }
662 }
663
664 task_log!(worker, "end backup {}:{}", datastore.name(), snapshot);
665
666 Ok(true)
667 }