]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/tape/backup.rs
tape: improve locking (lock media-sets)
[proxmox-backup.git] / src / api2 / tape / backup.rs
1 use std::path::Path;
2 use std::sync::{Mutex, Arc};
3
4 use anyhow::{bail, format_err, Error};
5 use serde_json::Value;
6
7 use proxmox::{
8 try_block,
9 api::{
10 api,
11 RpcEnvironment,
12 RpcEnvironmentType,
13 Router,
14 Permission,
15 },
16 };
17
18 use crate::{
19 task_log,
20 task_warn,
21 config::{
22 self,
23 cached_user_info::CachedUserInfo,
24 acl::{
25 PRIV_DATASTORE_READ,
26 PRIV_TAPE_AUDIT,
27 PRIV_TAPE_WRITE,
28 },
29 tape_job::{
30 TapeBackupJobConfig,
31 TapeBackupJobSetup,
32 TapeBackupJobStatus,
33 },
34 },
35 server::{
36 lookup_user_email,
37 TapeBackupJobSummary,
38 jobstate::{
39 Job,
40 JobState,
41 compute_schedule_status,
42 },
43 },
44 backup::{
45 DataStore,
46 BackupDir,
47 BackupInfo,
48 StoreProgress,
49 },
50 api2::types::{
51 Authid,
52 UPID_SCHEMA,
53 JOB_ID_SCHEMA,
54 MediaPoolConfig,
55 Userid,
56 },
57 server::WorkerTask,
58 task::TaskState,
59 tape::{
60 TAPE_STATUS_DIR,
61 Inventory,
62 PoolWriter,
63 MediaPool,
64 SnapshotReader,
65 drive::{
66 media_changer,
67 lock_tape_device,
68 set_tape_device_state,
69 },
70 changer::update_changer_online_status,
71 },
72 };
73
74 const TAPE_BACKUP_JOB_ROUTER: Router = Router::new()
75 .post(&API_METHOD_RUN_TAPE_BACKUP_JOB);
76
77 pub const ROUTER: Router = Router::new()
78 .get(&API_METHOD_LIST_TAPE_BACKUP_JOBS)
79 .post(&API_METHOD_BACKUP)
80 .match_all("id", &TAPE_BACKUP_JOB_ROUTER);
81
82 fn check_backup_permission(
83 auth_id: &Authid,
84 store: &str,
85 pool: &str,
86 drive: &str,
87 ) -> Result<(), Error> {
88
89 let user_info = CachedUserInfo::new()?;
90
91 let privs = user_info.lookup_privs(auth_id, &["datastore", store]);
92 if (privs & PRIV_DATASTORE_READ) == 0 {
93 bail!("no permissions on /datastore/{}", store);
94 }
95
96 let privs = user_info.lookup_privs(auth_id, &["tape", "drive", drive]);
97 if (privs & PRIV_TAPE_WRITE) == 0 {
98 bail!("no permissions on /tape/drive/{}", drive);
99 }
100
101 let privs = user_info.lookup_privs(auth_id, &["tape", "pool", pool]);
102 if (privs & PRIV_TAPE_WRITE) == 0 {
103 bail!("no permissions on /tape/pool/{}", pool);
104 }
105
106 Ok(())
107 }
108
109 #[api(
110 returns: {
111 description: "List configured thape backup jobs and their status",
112 type: Array,
113 items: { type: TapeBackupJobStatus },
114 },
115 access: {
116 description: "List configured tape jobs filtered by Tape.Audit privileges",
117 permission: &Permission::Anybody,
118 },
119 )]
120 /// List all tape backup jobs
121 pub fn list_tape_backup_jobs(
122 _param: Value,
123 mut rpcenv: &mut dyn RpcEnvironment,
124 ) -> Result<Vec<TapeBackupJobStatus>, Error> {
125 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
126 let user_info = CachedUserInfo::new()?;
127
128 let (config, digest) = config::tape_job::config()?;
129
130 let job_list_iter = config
131 .convert_to_typed_array("backup")?
132 .into_iter()
133 .filter(|_job: &TapeBackupJobConfig| {
134 // fixme: check access permission
135 true
136 });
137
138 let mut list = Vec::new();
139
140 for job in job_list_iter {
141 let privs = user_info.lookup_privs(&auth_id, &["tape", "job", &job.id]);
142 if (privs & PRIV_TAPE_AUDIT) == 0 {
143 continue;
144 }
145
146 let last_state = JobState::load("tape-backup-job", &job.id)
147 .map_err(|err| format_err!("could not open statefile for {}: {}", &job.id, err))?;
148
149 let status = compute_schedule_status(&last_state, job.schedule.as_deref())?;
150
151 list.push(TapeBackupJobStatus { config: job, status });
152 }
153
154 rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
155
156 Ok(list)
157 }
158
159 pub fn do_tape_backup_job(
160 mut job: Job,
161 setup: TapeBackupJobSetup,
162 auth_id: &Authid,
163 schedule: Option<String>,
164 ) -> Result<String, Error> {
165
166 let job_id = format!("{}:{}:{}:{}",
167 setup.store,
168 setup.pool,
169 setup.drive,
170 job.jobname());
171
172 let worker_type = job.jobtype().to_string();
173
174 let datastore = DataStore::lookup_datastore(&setup.store)?;
175
176 let (config, _digest) = config::media_pool::config()?;
177 let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
178
179 let (drive_config, _digest) = config::drive::config()?;
180
181 // for scheduled jobs we acquire the lock later in the worker
182 let drive_lock = if schedule.is_some() {
183 None
184 } else {
185 Some(lock_tape_device(&drive_config, &setup.drive)?)
186 };
187
188 let notify_user = setup.notify_user.as_ref().unwrap_or_else(|| &Userid::root_userid());
189 let email = lookup_user_email(notify_user);
190
191 let upid_str = WorkerTask::new_thread(
192 &worker_type,
193 Some(job_id.clone()),
194 auth_id.clone(),
195 false,
196 move |worker| {
197 job.start(&worker.upid().to_string())?;
198 let mut drive_lock = drive_lock;
199
200 let (job_result, summary) = match try_block!({
201 if schedule.is_some() {
202 // for scheduled tape backup jobs, we wait indefinitely for the lock
203 task_log!(worker, "waiting for drive lock...");
204 loop {
205 if let Ok(lock) = lock_tape_device(&drive_config, &setup.drive) {
206 drive_lock = Some(lock);
207 break;
208 } // ignore errors
209
210 worker.check_abort()?;
211 }
212 }
213 set_tape_device_state(&setup.drive, &worker.upid().to_string())?;
214
215 task_log!(worker,"Starting tape backup job '{}'", job_id);
216 if let Some(event_str) = schedule {
217 task_log!(worker,"task triggered by schedule '{}'", event_str);
218 }
219
220 backup_worker(
221 &worker,
222 datastore,
223 &pool_config,
224 &setup,
225 email.clone(),
226 )
227 }) {
228 Ok(summary) => (Ok(()), summary),
229 Err(err) => (Err(err), Default::default()),
230 };
231
232 let status = worker.create_state(&job_result);
233
234 if let Some(email) = email {
235 if let Err(err) = crate::server::send_tape_backup_status(
236 &email,
237 Some(job.jobname()),
238 &setup,
239 &job_result,
240 summary,
241 ) {
242 eprintln!("send tape backup notification failed: {}", err);
243 }
244 }
245
246 if let Err(err) = job.finish(status) {
247 eprintln!(
248 "could not finish job state for {}: {}",
249 job.jobtype().to_string(),
250 err
251 );
252 }
253
254 if let Err(err) = set_tape_device_state(&setup.drive, "") {
255 eprintln!(
256 "could not unset drive state for {}: {}",
257 setup.drive,
258 err
259 );
260 }
261
262 job_result
263 }
264 )?;
265
266 Ok(upid_str)
267 }
268
269 #[api(
270 input: {
271 properties: {
272 id: {
273 schema: JOB_ID_SCHEMA,
274 },
275 },
276 },
277 access: {
278 // Note: parameters are from job config, so we need to test inside function body
279 description: "The user needs Tape.Write privilege on /tape/pool/{pool} \
280 and /tape/drive/{drive}, Datastore.Read privilege on /datastore/{store}.",
281 permission: &Permission::Anybody,
282 },
283 )]
284 /// Runs a tape backup job manually.
285 pub fn run_tape_backup_job(
286 id: String,
287 rpcenv: &mut dyn RpcEnvironment,
288 ) -> Result<String, Error> {
289 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
290
291 let (config, _digest) = config::tape_job::config()?;
292 let backup_job: TapeBackupJobConfig = config.lookup("backup", &id)?;
293
294 check_backup_permission(
295 &auth_id,
296 &backup_job.setup.store,
297 &backup_job.setup.pool,
298 &backup_job.setup.drive,
299 )?;
300
301 let job = Job::new("tape-backup-job", &id)?;
302
303 let upid_str = do_tape_backup_job(job, backup_job.setup, &auth_id, None)?;
304
305 Ok(upid_str)
306 }
307
308 #[api(
309 input: {
310 properties: {
311 setup: {
312 type: TapeBackupJobSetup,
313 flatten: true,
314 },
315 },
316 },
317 returns: {
318 schema: UPID_SCHEMA,
319 },
320 access: {
321 // Note: parameters are no uri parameter, so we need to test inside function body
322 description: "The user needs Tape.Write privilege on /tape/pool/{pool} \
323 and /tape/drive/{drive}, Datastore.Read privilege on /datastore/{store}.",
324 permission: &Permission::Anybody,
325 },
326 )]
327 /// Backup datastore to tape media pool
328 pub fn backup(
329 setup: TapeBackupJobSetup,
330 rpcenv: &mut dyn RpcEnvironment,
331 ) -> Result<Value, Error> {
332
333 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
334
335 check_backup_permission(
336 &auth_id,
337 &setup.store,
338 &setup.pool,
339 &setup.drive,
340 )?;
341
342 let datastore = DataStore::lookup_datastore(&setup.store)?;
343
344 let (config, _digest) = config::media_pool::config()?;
345 let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
346
347 let (drive_config, _digest) = config::drive::config()?;
348
349 // early check/lock before starting worker
350 let drive_lock = lock_tape_device(&drive_config, &setup.drive)?;
351
352 let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
353
354 let job_id = format!("{}:{}:{}", setup.store, setup.pool, setup.drive);
355
356 let notify_user = setup.notify_user.as_ref().unwrap_or_else(|| &Userid::root_userid());
357 let email = lookup_user_email(notify_user);
358
359 let upid_str = WorkerTask::new_thread(
360 "tape-backup",
361 Some(job_id),
362 auth_id,
363 to_stdout,
364 move |worker| {
365 let _drive_lock = drive_lock; // keep lock guard
366 set_tape_device_state(&setup.drive, &worker.upid().to_string())?;
367
368 let (job_result, summary) = match backup_worker(
369 &worker,
370 datastore,
371 &pool_config,
372 &setup,
373 email.clone(),
374 ) {
375 Ok(summary) => (Ok(()), summary),
376 Err(err) => (Err(err), Default::default()),
377 };
378
379 if let Some(email) = email {
380 if let Err(err) = crate::server::send_tape_backup_status(
381 &email,
382 None,
383 &setup,
384 &job_result,
385 summary,
386 ) {
387 eprintln!("send tape backup notification failed: {}", err);
388 }
389 }
390
391 // ignore errors
392 let _ = set_tape_device_state(&setup.drive, "");
393 job_result
394 }
395 )?;
396
397 Ok(upid_str.into())
398 }
399
400 fn backup_worker(
401 worker: &WorkerTask,
402 datastore: Arc<DataStore>,
403 pool_config: &MediaPoolConfig,
404 setup: &TapeBackupJobSetup,
405 email: Option<String>,
406 ) -> Result<TapeBackupJobSummary, Error> {
407
408 let status_path = Path::new(TAPE_STATUS_DIR);
409 let start = std::time::Instant::now();
410 let mut summary: TapeBackupJobSummary = Default::default();
411
412 task_log!(worker, "update media online status");
413 let changer_name = update_media_online_status(&setup.drive)?;
414
415 let pool = MediaPool::with_config(status_path, &pool_config, changer_name, false)?;
416
417 let mut pool_writer = PoolWriter::new(pool, &setup.drive, worker, email)?;
418
419 let mut group_list = BackupInfo::list_backup_groups(&datastore.base_path())?;
420
421 group_list.sort_unstable();
422
423 let group_count = group_list.len();
424 task_log!(worker, "found {} groups", group_count);
425
426 let mut progress = StoreProgress::new(group_count as u64);
427
428 let latest_only = setup.latest_only.unwrap_or(false);
429
430 if latest_only {
431 task_log!(worker, "latest-only: true (only considering latest snapshots)");
432 }
433
434 let datastore_name = datastore.name();
435
436 let mut errors = false;
437
438 for (group_number, group) in group_list.into_iter().enumerate() {
439 progress.done_groups = group_number as u64;
440 progress.done_snapshots = 0;
441 progress.group_snapshots = 0;
442
443 let mut snapshot_list = group.list_backups(&datastore.base_path())?;
444
445 BackupInfo::sort_list(&mut snapshot_list, true); // oldest first
446
447 if latest_only {
448 progress.group_snapshots = 1;
449 if let Some(info) = snapshot_list.pop() {
450 if pool_writer.contains_snapshot(datastore_name, &info.backup_dir.to_string()) {
451 task_log!(worker, "skip snapshot {}", info.backup_dir);
452 continue;
453 }
454 let snapshot_name = info.backup_dir.to_string();
455 if !backup_snapshot(worker, &mut pool_writer, datastore.clone(), info.backup_dir)? {
456 errors = true;
457 } else {
458 summary.snapshot_list.push(snapshot_name);
459 }
460 progress.done_snapshots = 1;
461 task_log!(
462 worker,
463 "percentage done: {}",
464 progress
465 );
466 }
467 } else {
468 progress.group_snapshots = snapshot_list.len() as u64;
469 for (snapshot_number, info) in snapshot_list.into_iter().enumerate() {
470 if pool_writer.contains_snapshot(datastore_name, &info.backup_dir.to_string()) {
471 task_log!(worker, "skip snapshot {}", info.backup_dir);
472 continue;
473 }
474 let snapshot_name = info.backup_dir.to_string();
475 if !backup_snapshot(worker, &mut pool_writer, datastore.clone(), info.backup_dir)? {
476 errors = true;
477 } else {
478 summary.snapshot_list.push(snapshot_name);
479 }
480 progress.done_snapshots = snapshot_number as u64 + 1;
481 task_log!(
482 worker,
483 "percentage done: {}",
484 progress
485 );
486 }
487 }
488 }
489
490 pool_writer.commit()?;
491
492 task_log!(worker, "append media catalog");
493
494 let uuid = pool_writer.load_writable_media(worker)?;
495 let done = pool_writer.append_catalog_archive(worker)?;
496 if !done {
497 task_log!(worker, "catalog does not fit on tape, writing to next volume");
498 pool_writer.set_media_status_full(&uuid)?;
499 pool_writer.load_writable_media(worker)?;
500 let done = pool_writer.append_catalog_archive(worker)?;
501 if !done {
502 bail!("write_catalog_archive failed on second media");
503 }
504 }
505
506 if setup.export_media_set.unwrap_or(false) {
507 pool_writer.export_media_set(worker)?;
508 } else if setup.eject_media.unwrap_or(false) {
509 pool_writer.eject_media(worker)?;
510 }
511
512 if errors {
513 bail!("Tape backup finished with some errors. Please check the task log.");
514 }
515
516 summary.duration = start.elapsed();
517
518 Ok(summary)
519 }
520
521 // Try to update the the media online status
522 fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
523
524 let (config, _digest) = config::drive::config()?;
525
526 if let Ok(Some((mut changer, changer_name))) = media_changer(&config, drive) {
527
528 let label_text_list = changer.online_media_label_texts()?;
529
530 let status_path = Path::new(TAPE_STATUS_DIR);
531 let mut inventory = Inventory::load(status_path)?;
532
533 update_changer_online_status(
534 &config,
535 &mut inventory,
536 &changer_name,
537 &label_text_list,
538 )?;
539
540 Ok(Some(changer_name))
541 } else {
542 Ok(None)
543 }
544 }
545
546 pub fn backup_snapshot(
547 worker: &WorkerTask,
548 pool_writer: &mut PoolWriter,
549 datastore: Arc<DataStore>,
550 snapshot: BackupDir,
551 ) -> Result<bool, Error> {
552
553 task_log!(worker, "backup snapshot {}", snapshot);
554
555 let snapshot_reader = match SnapshotReader::new(datastore.clone(), snapshot.clone()) {
556 Ok(reader) => reader,
557 Err(err) => {
558 // ignore missing snapshots and continue
559 task_warn!(worker, "failed opening snapshot '{}': {}", snapshot, err);
560 return Ok(false);
561 }
562 };
563
564 let snapshot_reader = Arc::new(Mutex::new(snapshot_reader));
565
566 let (reader_thread, chunk_iter) = pool_writer.spawn_chunk_reader_thread(
567 datastore.clone(),
568 snapshot_reader.clone(),
569 )?;
570
571 let mut chunk_iter = chunk_iter.peekable();
572
573 loop {
574 worker.check_abort()?;
575
576 // test is we have remaining chunks
577 match chunk_iter.peek() {
578 None => break,
579 Some(Ok(_)) => { /* Ok */ },
580 Some(Err(err)) => bail!("{}", err),
581 }
582
583 let uuid = pool_writer.load_writable_media(worker)?;
584
585 worker.check_abort()?;
586
587 let (leom, _bytes) = pool_writer.append_chunk_archive(worker, &mut chunk_iter, datastore.name())?;
588
589 if leom {
590 pool_writer.set_media_status_full(&uuid)?;
591 }
592 }
593
594 if let Err(_) = reader_thread.join() {
595 bail!("chunk reader thread failed");
596 }
597
598 worker.check_abort()?;
599
600 let uuid = pool_writer.load_writable_media(worker)?;
601
602 worker.check_abort()?;
603
604 let snapshot_reader = snapshot_reader.lock().unwrap();
605
606 let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?;
607
608 if !done {
609 // does not fit on tape, so we try on next volume
610 pool_writer.set_media_status_full(&uuid)?;
611
612 worker.check_abort()?;
613
614 pool_writer.load_writable_media(worker)?;
615 let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?;
616
617 if !done {
618 bail!("write_snapshot_archive failed on second media");
619 }
620 }
621
622 task_log!(worker, "end backup {}:{}", datastore.name(), snapshot);
623
624 Ok(true)
625 }