]> git.proxmox.com Git - pve-manager.git/blame - PVE/Replication.pm
use new PVE::ReplicationState::replication_snapshot_name()
[pve-manager.git] / PVE / Replication.pm
CommitLineData
892821fd
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
8
9use PVE::INotify;
483f89dd 10use PVE::ProcFSTools;
892821fd 11use PVE::Tools;
5c180db3 12use PVE::CalendarEvent;
892821fd 13use PVE::Cluster;
1a9dc09e 14use PVE::AbstractConfig;
892821fd
DM
15use PVE::QemuConfig;
16use PVE::QemuServer;
17use PVE::LXC::Config;
18use PVE::LXC;
19use PVE::Storage;
1a9dc09e 20use PVE::GuestHelpers;
892821fd 21use PVE::ReplicationConfig;
d255af01 22use PVE::ReplicationState;
892821fd 23
dc213d13 24our $pvesr_lock_path = "/var/lock/pvesr.lck";
892821fd 25
892821fd 26sub job_status {
892821fd
DM
27
28 my $local_node = PVE::INotify::nodename();
29
30 my $jobs = {};
31
d255af01 32 my $stateobj = PVE::ReplicationState::read_state();
892821fd
DM
33
34 my $cfg = PVE::ReplicationConfig->new();
35
36 my $vms = PVE::Cluster::get_vmlist();
37
38 foreach my $jobid (sort keys %{$cfg->{ids}}) {
39 my $jobcfg = $cfg->{ids}->{$jobid};
40 my $vmid = $jobcfg->{guest};
41
42 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
43
44 # skip non existing vms
45 next if !$vms->{ids}->{$vmid};
46
47 # only consider guest on local node
48 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
49
495aa710
DM
50 if (!$jobcfg->{remove_job}) {
51 # never sync to local node
52 next if $jobcfg->{target} eq $local_node;
892821fd 53
495aa710
DM
54 next if $jobcfg->{disable};
55 }
892821fd 56
d255af01 57 my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
5c180db3 58 $jobcfg->{state} = $state;
892821fd
DM
59 $jobcfg->{id} = $jobid;
60 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
61
5c180db3 62 my $next_sync = 0;
f9d38c54
DM
63
64 if ($jobcfg->{remove_job}) {
65 $next_sync = 1; # lowest possible value
66 # todo: consider fail_count? How many retries?
67 } else {
68 if (my $fail_count = $state->{fail_count}) {
69 if ($fail_count < 3) {
70 $next_sync = $state->{last_try} + 5*60*$fail_count;
71 }
72 } else {
73 my $schedule = $jobcfg->{schedule} || '*/15';
74 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
75 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
5c180db3 76 }
5c180db3 77 }
f9d38c54 78
5c180db3
DM
79 $jobcfg->{next_sync} = $next_sync;
80
892821fd
DM
81 $jobs->{$jobid} = $jobcfg;
82 }
83
84 return $jobs;
85}
86
87my $get_next_job = sub {
d255af01 88 my ($iteration, $start_time) = @_;
892821fd 89
d255af01 90 my $jobs = job_status();
892821fd 91
892821fd
DM
92 my $sort_func = sub {
93 my $joba = $jobs->{$a};
94 my $jobb = $jobs->{$b};
95 my $sa = $joba->{state};
96 my $sb = $jobb->{state};
97 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
98 return $res if $res != 0;
5c180db3 99 $res = $joba->{next_sync} <=> $jobb->{next_sync};
892821fd
DM
100 return $res if $res != 0;
101 return $joba->{guest} <=> $jobb->{guest};
102 };
103
104 foreach my $jobid (sort $sort_func keys %$jobs) {
105 my $jobcfg = $jobs->{$jobid};
892821fd
DM
106 next if $jobcfg->{state}->{last_iteration} >= $iteration;
107 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
6d645cb2 108 return $jobcfg;
892821fd
DM
109 }
110 }
111
6d645cb2 112 return undef;
892821fd
DM
113};
114
acea170e 115sub remote_prepare_local_job {
f9d38c54 116 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $force) = @_;
acea170e
DM
117
118 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
f9d38c54
DM
119 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid, $vmid];
120 push @$cmd, @$volumes if scalar(@$volumes);
121
122 push @$cmd, '--last_sync', $last_sync;
123 push @$cmd, '--force' if $force;
acea170e
DM
124
125 my $remote_snapshots;
126
127 my $parser = sub {
128 my $line = shift;
129 $remote_snapshots = JSON::decode_json($line);
130 };
131
132 PVE::Tools::run_command($cmd, outfunc => $parser);
133
134 die "prepare remote node failed - no result\n"
135 if !defined($remote_snapshots);
136
137 return $remote_snapshots;
138}
139
b11e512f
DM
140sub remote_finalize_local_job {
141 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
142
143 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
144 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
145 $vmid, @$volumes, '--last_sync', $last_sync];
146
147 PVE::Tools::run_command($cmd);
148}
149
fae99506
DM
150sub prepare {
151 my ($storecfg, $volids, $jobid, $last_sync, $start_time, $logfunc) = @_;
152
9b24b5d4
DM
153 my ($prefix, $snapname) =
154 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
fae99506
DM
155
156 my $last_snapshots = {};
157 foreach my $volid (@$volids) {
158 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid, $prefix);
159 my $found = 0;
160 foreach my $snap (@$list) {
161 if ($snap eq $snapname) {
162 $last_snapshots->{$volid} = 1;
163 } else {
c364b61f 164 $logfunc->("$jobid: delete stale snapshot '$snap' on $volid");
fae99506
DM
165 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
166 }
167 }
168 }
169
170 return $last_snapshots;
171}
f70997ea 172
7f6ff9dd 173sub replicate_volume {
ffe5a020 174 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
892821fd 175
ffe5a020
DM
176 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
177
178 # fixme: handle $rate, $insecure ??
179 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
180 $base_snapshot, $sync_snapname);
892821fd
DM
181}
182
f9d38c54
DM
183sub delete_job {
184 my ($jobid) = @_;
185
186 my $code = sub {
187 my $cfg = PVE::ReplicationConfig->new();
188 delete $cfg->{ids}->{$jobid};
189 $cfg->write();
190 };
191
192 PVE::ReplicationConfig::lock($code);
193}
194
7f6ff9dd
DM
195sub replicate {
196 my ($jobcfg, $last_sync, $start_time, $logfunc) = @_;
197
198 $logfunc = sub {} if !$logfunc; # log nothing by default
199
200 my $local_node = PVE::INotify::nodename();
201
202 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
203
204 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
ffe5a020
DM
205
206 my $migration_network;
207 my $migration_type = 'secure';
208 if (my $mc = $dc_conf->{migration}) {
209 $migration_network = $mc->{network};
210 $migration_type = $mc->{type} if defined($mc->{type});
211 }
212
7f6ff9dd
DM
213 my $jobid = $jobcfg->{id};
214 my $storecfg = PVE::Storage::config();
215
216 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
217 if $start_time <= $last_sync;
218
219 my $vmid = $jobcfg->{guest};
220 my $vmtype = $jobcfg->{vmtype};
221
222 my $conf;
223 my $running;
224 my $qga;
225 my $volumes;
226
227 if ($vmtype eq 'qemu') {
228 $conf = PVE::QemuConfig->load_config($vmid);
229 $running = PVE::QemuServer::check_running($vmid);
230 $qga = PVE::QemuServer::qga_check_running($vmid)
231 if $running && $conf->{agent};
232 $volumes = PVE::QemuConfig->get_replicatable_volumes($storecfg, $conf);
233 } elsif ($vmtype eq 'lxc') {
234 $conf = PVE::LXC::Config->load_config($vmid);
235 $running = PVE::LXC::check_running($vmid);
236 $volumes = PVE::LXC::Config->get_replicatable_volumes($storecfg, $conf);
237 } else {
238 die "internal error";
239 }
240
241 my $sorted_volids = [ sort keys %$volumes ];
242
c364b61f
DM
243 $logfunc->("$jobid: guest => $vmid, type => $vmtype, running => $running");
244 $logfunc->("$jobid: volumes => " . join(',', @$sorted_volids));
7f6ff9dd 245
f9d38c54
DM
246 if (my $remove_job = $jobcfg->{remove_job}) {
247
c364b61f 248 $logfunc->("$jobid: start job removal - mode '${remove_job}'");
f9d38c54 249
495aa710 250 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
f9d38c54 251 # remove all remote volumes
d793b4a6 252 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
f9d38c54
DM
253 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], 0, 1);
254
255 }
256 # remove all local replication snapshots (lastsync => 0)
257 prepare($storecfg, $sorted_volids, $jobid, 0, $start_time, $logfunc);
258
259 delete_job($jobid); # update config
c364b61f 260 $logfunc->("$jobid: job removed");
f9d38c54
DM
261
262 return;
263 }
264
d793b4a6
DM
265 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
266
7f6ff9dd
DM
267 # prepare remote side
268 my $remote_snapshots = remote_prepare_local_job(
f9d38c54 269 $ssh_info, $jobid, $vmid, $sorted_volids, $last_sync);
7f6ff9dd
DM
270
271 # test if we have a replication_ snapshot from last sync
272 # and remove all other/stale replication snapshots
9b24b5d4
DM
273 my $last_sync_snapname =
274 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
275 my $sync_snapname =
276 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
7f6ff9dd
DM
277
278 my $last_snapshots = prepare(
279 $storecfg, $sorted_volids, $jobid, $last_sync, $start_time, $logfunc);
280
281 # freeze filesystem for data consistency
282 if ($qga) {
c364b61f 283 $logfunc->("$jobid: freeze guest filesystem");
7f6ff9dd
DM
284 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
285 }
286
287 # make snapshot of all volumes
288 my $replicate_snapshots = {};
289 eval {
290 foreach my $volid (@$sorted_volids) {
c364b61f 291 $logfunc->("$jobid: create snapshot '${sync_snapname}' on $volid");
7f6ff9dd
DM
292 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
293 $replicate_snapshots->{$volid} = 1;
294 }
295 };
296 my $err = $@;
297
298 # unfreeze immediately
299 if ($qga) {
c364b61f 300 $logfunc->("$jobid: unfreeze guest filesystem");
7f6ff9dd
DM
301 eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
302 warn $@ if $@; # ignore errors here, because we cannot fix it anyways
303 }
304
305 my $cleanup_local_snapshots = sub {
306 my ($volid_hash, $snapname) = @_;
307 foreach my $volid (sort keys %$volid_hash) {
c364b61f 308 $logfunc->("$jobid: delete snapshot '$snapname' on $volid");
7f6ff9dd
DM
309 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running); };
310 warn $@ if $@;
311 }
312 };
313
314 if ($err) {
315 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
316 die $err;
317 }
318
319 eval {
320
ffe5a020
DM
321 my $rate = $jobcfg->{rate};
322 my $insecure = $migration_type eq 'insecure';
323
7f6ff9dd 324 foreach my $volid (@$sorted_volids) {
ffe5a020 325 my $base_snapname;
7f6ff9dd 326 if ($last_snapshots->{$volid} && $remote_snapshots->{$volid}) {
c364b61f 327 $logfunc->("$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
ffe5a020 328 $base_snapname = $last_sync_snapname;
7f6ff9dd 329 } else {
c364b61f 330 $logfunc->("$jobid: full sync '$volid' ($sync_snapname)");
7f6ff9dd 331 }
ffe5a020 332 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
7f6ff9dd
DM
333 }
334 };
335 $err = $@;
336
337 if ($err) {
338 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
339 # we do not cleanup the remote side here - this is done in
340 # next run of prepare_local_job
341 die $err;
342 }
343
344 # remove old snapshots because they are no longer needed
345 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
346
347 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
348
349 die $err if $err;
350}
351
d255af01
DM
352my $run_replication_nolock = sub {
353 my ($jobcfg, $iteration, $start_time, $logfunc) = @_;
892821fd 354
d255af01
DM
355 # we normaly write errors into the state file,
356 # but we also catch unexpected errors and log them to syslog
357 # (for examply when there are problems writing the state file)
358 eval {
359 my $state = PVE::ReplicationState::read_job_state($jobcfg);
892821fd 360
d255af01 361 my $t0 = [gettimeofday];
483f89dd 362
d255af01
DM
363 $state->{pid} = $$;
364 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
e137f69f 365 $state->{last_node} = PVE::INotify::nodename();
d255af01
DM
366 $state->{last_try} = $start_time;
367 $state->{last_iteration} = $iteration;
6d645cb2 368
d255af01 369 PVE::ReplicationState::write_job_state($jobcfg, $state);
483f89dd 370
c364b61f 371 $logfunc->("$jobcfg->{id}: start replication job") if $logfunc;
f70997ea 372
d255af01
DM
373 eval {
374 replicate($jobcfg, $state->{last_sync}, $start_time, $logfunc);
375 };
376 my $err = $@;
892821fd 377
d255af01
DM
378 $state->{duration} = tv_interval($t0);
379 delete $state->{pid};
380 delete $state->{ptime};
892821fd 381
d255af01 382 if ($err) {
f70997ea 383 chomp $err;
d255af01
DM
384 $state->{fail_count}++;
385 $state->{error} = "$err";
386 PVE::ReplicationState::write_job_state($jobcfg, $state);
387 my $msg = "$jobcfg->{id}: end replication job with error: $err";
388 if ($logfunc) {
c364b61f 389 $logfunc->($msg);
d255af01
DM
390 } else {
391 warn "$msg\n";
392 }
f70997ea 393 } else {
c364b61f 394 $logfunc->("$jobcfg->{id}: end replication job") if $logfunc;
d255af01
DM
395 $state->{last_sync} = $start_time;
396 $state->{fail_count} = 0;
397 delete $state->{error};
398 PVE::ReplicationState::write_job_state($jobcfg, $state);
f70997ea 399 }
d255af01
DM
400 };
401 if (my $err = $@) {
402 warn "$jobcfg->{id}: got unexpected error - $err";
403 }
404};
405
406my $run_replication = sub {
407 my ($jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
408
409 eval {
410 my $timeout = 2; # do not wait too long - we repeat periodically anyways
411 PVE::GuestHelpers::guest_migration_lock(
412 $jobcfg->{guest}, $timeout, $run_replication_nolock,
413 $jobcfg, $iteration, $start_time, $logfunc);
414 };
415 if (my $err = $@) {
416 return undef if $noerr;
417 die $err;
892821fd
DM
418 }
419};
420
421sub run_single_job {
f70997ea 422 my ($jobid, $now, $logfunc) = @_; # passing $now useful for regression testing
892821fd
DM
423
424 my $local_node = PVE::INotify::nodename();
425
426 my $code = sub {
427 $now //= time();
428
892821fd
DM
429 my $cfg = PVE::ReplicationConfig->new();
430
431 my $jobcfg = $cfg->{ids}->{$jobid};
432 die "no such job '$jobid'\n" if !$jobcfg;
433
434 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
435
436 die "job '$jobid' is disabled\n" if $jobcfg->{disable};
437
438 my $vms = PVE::Cluster::get_vmlist();
439 my $vmid = $jobcfg->{guest};
440
441 die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
442
443 die "guest '$vmid' is not on local node\n"
444 if $vms->{ids}->{$vmid}->{node} ne $local_node;
445
446 die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
447
892821fd
DM
448 $jobcfg->{id} = $jobid;
449 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
450
d255af01 451 $run_replication->($jobcfg, $now, $now, $logfunc);
892821fd
DM
452 };
453
dc213d13 454 my $res = PVE::Tools::lock_file($pvesr_lock_path, 60, $code);
892821fd
DM
455 die $@ if $@;
456}
457
458sub run_jobs {
f70997ea 459 my ($now, $logfunc) = @_; # useful for regression testing
892821fd
DM
460
461 my $iteration = $now // time();
462
463 my $code = sub {
892821fd
DM
464 my $start_time = $now // time();
465
d255af01
DM
466 while (my $jobcfg = $get_next_job->($iteration, $start_time)) {
467 $run_replication->($jobcfg, $iteration, $start_time, $logfunc, 1);
892821fd
DM
468 $start_time = $now // time();
469 }
470 };
471
dc213d13 472 my $res = PVE::Tools::lock_file($pvesr_lock_path, 60, $code);
892821fd
DM
473 die $@ if $@;
474}
475
4761;