]> git.proxmox.com Git - pve-manager-legacy.git/blob - PVE/Replication.pm
PVE::API2::ReplicationConfig - implement delete
[pve-manager-legacy.git] / PVE / Replication.pm
1 package PVE::Replication;
2
3 use warnings;
4 use strict;
5 use Data::Dumper;
6 use JSON;
7 use Time::HiRes qw(gettimeofday tv_interval);
8
9 use PVE::INotify;
10 use PVE::ProcFSTools;
11 use PVE::Tools;
12 use PVE::CalendarEvent;
13 use PVE::Cluster;
14 use PVE::AbstractConfig;
15 use PVE::QemuConfig;
16 use PVE::QemuServer;
17 use PVE::LXC::Config;
18 use PVE::LXC;
19 use PVE::Storage;
20 use PVE::GuestHelpers;
21 use PVE::ReplicationConfig;
22
23 # Note: regression tests can overwrite $state_path for testing
24 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
25
26 my $update_job_state = sub {
27 my ($stateobj, $jobcfg, $state) = @_;
28
29 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
30
31 my $vmid = $jobcfg->{guest};
32 my $tid = $plugin->get_unique_target_id($jobcfg);
33
34 # Note: tuple ($vmid, $tid) is unique
35 $stateobj->{$vmid}->{$tid} = $state;
36
37 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
38 };
39
40 my $get_job_state = sub {
41 my ($stateobj, $jobcfg) = @_;
42
43 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
44
45 my $vmid = $jobcfg->{guest};
46 my $tid = $plugin->get_unique_target_id($jobcfg);
47 my $state = $stateobj->{$vmid}->{$tid};
48
49 $state = {} if !$state;
50
51 $state->{last_iteration} //= 0;
52 $state->{last_try} //= 0; # last sync start time
53 $state->{last_sync} //= 0; # last successful sync start time
54 $state->{fail_count} //= 0;
55
56 return $state;
57 };
58
59 my $read_state = sub {
60
61 return {} if ! -e $state_path;
62
63 my $raw = PVE::Tools::file_get_contents($state_path);
64
65 return {} if $raw eq '';
66
67 return decode_json($raw);
68 };
69
70 sub job_status {
71 my ($stateobj) = @_;
72
73 my $local_node = PVE::INotify::nodename();
74
75 my $jobs = {};
76
77 $stateobj = $read_state->() if !$stateobj;
78
79 my $cfg = PVE::ReplicationConfig->new();
80
81 my $vms = PVE::Cluster::get_vmlist();
82
83 foreach my $jobid (sort keys %{$cfg->{ids}}) {
84 my $jobcfg = $cfg->{ids}->{$jobid};
85 my $vmid = $jobcfg->{guest};
86
87 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
88
89 # skip non existing vms
90 next if !$vms->{ids}->{$vmid};
91
92 # only consider guest on local node
93 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
94
95 # never sync to local node
96 next if $jobcfg->{target} eq $local_node;
97
98 next if $jobcfg->{disable};
99
100 my $state = $get_job_state->($stateobj, $jobcfg);
101 $jobcfg->{state} = $state;
102 $jobcfg->{id} = $jobid;
103 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
104
105 my $next_sync = 0;
106
107 if ($jobcfg->{remove_job}) {
108 $next_sync = 1; # lowest possible value
109 # todo: consider fail_count? How many retries?
110 } else {
111 if (my $fail_count = $state->{fail_count}) {
112 if ($fail_count < 3) {
113 $next_sync = $state->{last_try} + 5*60*$fail_count;
114 }
115 } else {
116 my $schedule = $jobcfg->{schedule} || '*/15';
117 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
118 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
119 }
120 }
121
122 $jobcfg->{next_sync} = $next_sync;
123
124 $jobs->{$jobid} = $jobcfg;
125 }
126
127 return $jobs;
128 }
129
130 my $get_next_job = sub {
131 my ($stateobj, $iteration, $start_time) = @_;
132
133 my $next_jobid;
134
135 my $jobs = job_status($stateobj);
136
137 my $sort_func = sub {
138 my $joba = $jobs->{$a};
139 my $jobb = $jobs->{$b};
140 my $sa = $joba->{state};
141 my $sb = $jobb->{state};
142 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
143 return $res if $res != 0;
144 $res = $joba->{next_sync} <=> $jobb->{next_sync};
145 return $res if $res != 0;
146 return $joba->{guest} <=> $jobb->{guest};
147 };
148
149 foreach my $jobid (sort $sort_func keys %$jobs) {
150 my $jobcfg = $jobs->{$jobid};
151 next if $jobcfg->{state}->{last_iteration} >= $iteration;
152 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
153 $next_jobid = $jobid;
154 last;
155 }
156 }
157
158 return undef if !$next_jobid;
159
160 my $jobcfg = $jobs->{$next_jobid};
161
162 $jobcfg->{state}->{last_iteration} = $iteration;
163 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
164
165 return $jobcfg;
166 };
167
168 sub replication_snapshot_name {
169 my ($jobid, $last_sync) = @_;
170
171 my $prefix = "replicate_${jobid}_";
172 my $snapname = "${prefix}${last_sync}_snap";
173
174 wantarray ? ($prefix, $snapname) : $snapname;
175 }
176
177 sub remote_prepare_local_job {
178 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $force) = @_;
179
180 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
181 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid, $vmid];
182 push @$cmd, @$volumes if scalar(@$volumes);
183
184 push @$cmd, '--last_sync', $last_sync;
185 push @$cmd, '--force' if $force;
186
187 my $remote_snapshots;
188
189 my $parser = sub {
190 my $line = shift;
191 $remote_snapshots = JSON::decode_json($line);
192 };
193
194 PVE::Tools::run_command($cmd, outfunc => $parser);
195
196 die "prepare remote node failed - no result\n"
197 if !defined($remote_snapshots);
198
199 return $remote_snapshots;
200 }
201
202 sub remote_finalize_local_job {
203 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
204
205 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
206 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
207 $vmid, @$volumes, '--last_sync', $last_sync];
208
209 PVE::Tools::run_command($cmd);
210 }
211
212 sub prepare {
213 my ($storecfg, $volids, $jobid, $last_sync, $start_time, $logfunc) = @_;
214
215 my ($prefix, $snapname) = replication_snapshot_name($jobid, $last_sync);
216
217 my $last_snapshots = {};
218 foreach my $volid (@$volids) {
219 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid, $prefix);
220 my $found = 0;
221 foreach my $snap (@$list) {
222 if ($snap eq $snapname) {
223 $last_snapshots->{$volid} = 1;
224 } else {
225 $logfunc->($start_time, "$jobid: delete stale snapshot '$snap' on $volid");
226 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
227 }
228 }
229 }
230
231 return $last_snapshots;
232 }
233
234 sub replicate_volume {
235 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
236
237 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
238
239 # fixme: handle $rate, $insecure ??
240 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
241 $base_snapshot, $sync_snapname);
242 }
243
244 sub delete_job {
245 my ($jobid) = @_;
246
247 my $code = sub {
248 my $cfg = PVE::ReplicationConfig->new();
249 delete $cfg->{ids}->{$jobid};
250 $cfg->write();
251 };
252
253 PVE::ReplicationConfig::lock($code);
254 }
255
256 sub replicate {
257 my ($jobcfg, $last_sync, $start_time, $logfunc) = @_;
258
259 $logfunc = sub {} if !$logfunc; # log nothing by default
260
261 my $local_node = PVE::INotify::nodename();
262
263 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
264
265 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
266
267 my $migration_network;
268 my $migration_type = 'secure';
269 if (my $mc = $dc_conf->{migration}) {
270 $migration_network = $mc->{network};
271 $migration_type = $mc->{type} if defined($mc->{type});
272 }
273
274 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
275
276 my $jobid = $jobcfg->{id};
277 my $storecfg = PVE::Storage::config();
278
279 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
280 if $start_time <= $last_sync;
281
282 my $vmid = $jobcfg->{guest};
283 my $vmtype = $jobcfg->{vmtype};
284
285 my $conf;
286 my $running;
287 my $qga;
288 my $volumes;
289
290 if ($vmtype eq 'qemu') {
291 $conf = PVE::QemuConfig->load_config($vmid);
292 $running = PVE::QemuServer::check_running($vmid);
293 $qga = PVE::QemuServer::qga_check_running($vmid)
294 if $running && $conf->{agent};
295 $volumes = PVE::QemuConfig->get_replicatable_volumes($storecfg, $conf);
296 } elsif ($vmtype eq 'lxc') {
297 $conf = PVE::LXC::Config->load_config($vmid);
298 $running = PVE::LXC::check_running($vmid);
299 $volumes = PVE::LXC::Config->get_replicatable_volumes($storecfg, $conf);
300 } else {
301 die "internal error";
302 }
303
304 my $sorted_volids = [ sort keys %$volumes ];
305
306 $logfunc->($start_time, "$jobid: guest => $vmid, type => $vmtype, running => $running");
307 $logfunc->($start_time, "$jobid: volumes => " . join(',', @$sorted_volids));
308
309 if (my $remove_job = $jobcfg->{remove_job}) {
310
311 $logfunc->($start_time, "$jobid: start job removal - mode '${remove_job}'");
312
313 if ($remove_job eq 'full') {
314 # remove all remote volumes
315 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], 0, 1);
316
317 }
318 # remove all local replication snapshots (lastsync => 0)
319 prepare($storecfg, $sorted_volids, $jobid, 0, $start_time, $logfunc);
320
321 delete_job($jobid); # update config
322 $logfunc->($start_time, "$jobid: job removed");
323
324 return;
325 }
326
327 # prepare remote side
328 my $remote_snapshots = remote_prepare_local_job(
329 $ssh_info, $jobid, $vmid, $sorted_volids, $last_sync);
330
331 # test if we have a replication_ snapshot from last sync
332 # and remove all other/stale replication snapshots
333 my $last_sync_snapname = replication_snapshot_name($jobid, $last_sync);
334 my $sync_snapname = replication_snapshot_name($jobid, $start_time);
335
336 my $last_snapshots = prepare(
337 $storecfg, $sorted_volids, $jobid, $last_sync, $start_time, $logfunc);
338
339 # freeze filesystem for data consistency
340 if ($qga) {
341 $logfunc->($start_time, "$jobid: freeze guest filesystem");
342 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
343 }
344
345 # make snapshot of all volumes
346 my $replicate_snapshots = {};
347 eval {
348 foreach my $volid (@$sorted_volids) {
349 $logfunc->($start_time, "$jobid: create snapshot '${sync_snapname}' on $volid");
350 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
351 $replicate_snapshots->{$volid} = 1;
352 }
353 };
354 my $err = $@;
355
356 # unfreeze immediately
357 if ($qga) {
358 $logfunc->($start_time, "$jobid: unfreeze guest filesystem");
359 eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
360 warn $@ if $@; # ignore errors here, because we cannot fix it anyways
361 }
362
363 my $cleanup_local_snapshots = sub {
364 my ($volid_hash, $snapname) = @_;
365 foreach my $volid (sort keys %$volid_hash) {
366 $logfunc->($start_time, "$jobid: delete snapshot '$snapname' on $volid");
367 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running); };
368 warn $@ if $@;
369 }
370 };
371
372 if ($err) {
373 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
374 die $err;
375 }
376
377 eval {
378
379 my $rate = $jobcfg->{rate};
380 my $insecure = $migration_type eq 'insecure';
381
382 foreach my $volid (@$sorted_volids) {
383 my $base_snapname;
384 if ($last_snapshots->{$volid} && $remote_snapshots->{$volid}) {
385 $logfunc->($start_time, "$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
386 $base_snapname = $last_sync_snapname;
387 } else {
388 $logfunc->($start_time, "$jobid: full sync '$volid' ($sync_snapname)");
389 }
390 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
391 }
392 };
393 $err = $@;
394
395 if ($err) {
396 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
397 # we do not cleanup the remote side here - this is done in
398 # next run of prepare_local_job
399 die $err;
400 }
401
402 # remove old snapshots because they are no longer needed
403 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
404
405 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
406
407 die $err if $err;
408 }
409
410 my $run_replication = sub {
411 my ($stateobj, $jobcfg, $start_time, $logfunc) = @_;
412
413 my $state = delete $jobcfg->{state};
414
415 my $t0 = [gettimeofday];
416
417 # cleanup stale pid/ptime state
418 foreach my $vmid (keys %$stateobj) {
419 foreach my $tid (keys %{$stateobj->{$vmid}}) {
420 my $state = $stateobj->{$vmid}->{$tid};
421 delete $state->{pid};
422 delete $state->{ptime};
423 }
424 }
425
426 $state->{pid} = $$;
427 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
428 $state->{last_try} = $start_time;
429 $update_job_state->($stateobj, $jobcfg, $state);
430
431 $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
432
433 eval {
434 my $timeout = 2; # do not wait too long - we repeat periodically anyways
435 PVE::GuestHelpers::guest_migration_lock(
436 $jobcfg->{guest}, $timeout, \&replicate,
437 $jobcfg, $state->{last_sync}, $start_time, $logfunc);
438 };
439 my $err = $@;
440
441 $state->{duration} = tv_interval($t0);
442 delete $state->{pid};
443 delete $state->{ptime};
444
445 if ($err) {
446 $state->{fail_count}++;
447 $state->{error} = "$err";
448 $update_job_state->($stateobj, $jobcfg, $state);
449 if ($logfunc) {
450 chomp $err;
451 $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
452 } else {
453 warn $err;
454 }
455 } else {
456 $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
457 $state->{last_sync} = $start_time;
458 $state->{fail_count} = 0;
459 delete $state->{error};
460 $update_job_state->($stateobj, $jobcfg, $state);
461 }
462 };
463
464 sub run_single_job {
465 my ($jobid, $now, $logfunc) = @_; # passing $now useful for regression testing
466
467 my $local_node = PVE::INotify::nodename();
468
469 my $code = sub {
470 $now //= time();
471
472 my $stateobj = $read_state->();
473
474 my $cfg = PVE::ReplicationConfig->new();
475
476 my $jobcfg = $cfg->{ids}->{$jobid};
477 die "no such job '$jobid'\n" if !$jobcfg;
478
479 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
480
481 die "job '$jobid' is disabled\n" if $jobcfg->{disable};
482
483 my $vms = PVE::Cluster::get_vmlist();
484 my $vmid = $jobcfg->{guest};
485
486 die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
487
488 die "guest '$vmid' is not on local node\n"
489 if $vms->{ids}->{$vmid}->{node} ne $local_node;
490
491 die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
492
493 $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
494 $jobcfg->{id} = $jobid;
495 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
496
497 $jobcfg->{state}->{last_iteration} = $now;
498 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
499
500 $run_replication->($stateobj, $jobcfg, $now, $logfunc);
501 };
502
503 my $res = PVE::Tools::lock_file($state_path, 60, $code);
504 die $@ if $@;
505 }
506
507 sub run_jobs {
508 my ($now, $logfunc) = @_; # useful for regression testing
509
510 my $iteration = $now // time();
511
512 my $code = sub {
513 my $stateobj = $read_state->();
514 my $start_time = $now // time();
515
516 while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
517 $run_replication->($stateobj, $jobcfg, $start_time, $logfunc);
518 $start_time = $now // time();
519 }
520 };
521
522 my $res = PVE::Tools::lock_file($state_path, 60, $code);
523 die $@ if $@;
524 }
525
526 1;