1 package PVE
::Replication
;
7 use Time
::HiRes
qw(gettimeofday tv_interval);
12 use PVE
::CalendarEvent
;
14 use PVE
::AbstractConfig
;
20 use PVE
::GuestHelpers
;
21 use PVE
::ReplicationConfig
;
23 # Note: regression tests can overwrite $state_path for testing
24 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
26 my $update_job_state = sub {
27 my ($stateobj, $jobcfg, $state) = @_;
29 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
31 my $vmid = $jobcfg->{guest
};
32 my $tid = $plugin->get_unique_target_id($jobcfg);
34 # Note: tuple ($vmid, $tid) is unique
35 $stateobj->{$vmid}->{$tid} = $state;
37 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
40 my $get_job_state = sub {
41 my ($stateobj, $jobcfg) = @_;
43 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
45 my $vmid = $jobcfg->{guest
};
46 my $tid = $plugin->get_unique_target_id($jobcfg);
47 my $state = $stateobj->{$vmid}->{$tid};
49 $state = {} if !$state;
51 $state->{last_iteration
} //= 0;
52 $state->{last_try
} //= 0; # last sync start time
53 $state->{last_sync
} //= 0; # last successful sync start time
54 $state->{fail_count
} //= 0;
59 my $read_state = sub {
61 return {} if ! -e
$state_path;
63 my $raw = PVE
::Tools
::file_get_contents
($state_path);
65 return {} if $raw eq '';
67 return decode_json
($raw);
73 my $local_node = PVE
::INotify
::nodename
();
77 $stateobj = $read_state->() if !$stateobj;
79 my $cfg = PVE
::ReplicationConfig-
>new();
81 my $vms = PVE
::Cluster
::get_vmlist
();
83 foreach my $jobid (sort keys %{$cfg->{ids
}}) {
84 my $jobcfg = $cfg->{ids
}->{$jobid};
85 my $vmid = $jobcfg->{guest
};
87 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
89 # skip non existing vms
90 next if !$vms->{ids
}->{$vmid};
92 # only consider guest on local node
93 next if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
95 # never sync to local node
96 next if $jobcfg->{target
} eq $local_node;
98 next if $jobcfg->{disable
};
100 my $state = $get_job_state->($stateobj, $jobcfg);
101 $jobcfg->{state} = $state;
102 $jobcfg->{id
} = $jobid;
103 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
107 if ($jobcfg->{remove_job
}) {
108 $next_sync = 1; # lowest possible value
109 # todo: consider fail_count? How many retries?
111 if (my $fail_count = $state->{fail_count
}) {
112 if ($fail_count < 3) {
113 $next_sync = $state->{last_try
} + 5*60*$fail_count;
116 my $schedule = $jobcfg->{schedule
} || '*/15';
117 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
118 $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $state->{last_try
}) // 0;
122 $jobcfg->{next_sync
} = $next_sync;
124 $jobs->{$jobid} = $jobcfg;
130 my $get_next_job = sub {
131 my ($stateobj, $iteration, $start_time) = @_;
135 my $jobs = job_status
($stateobj);
137 my $sort_func = sub {
138 my $joba = $jobs->{$a};
139 my $jobb = $jobs->{$b};
140 my $sa = $joba->{state};
141 my $sb = $jobb->{state};
142 my $res = $sa->{last_iteration
} cmp $sb->{last_iteration
};
143 return $res if $res != 0;
144 $res = $joba->{next_sync
} <=> $jobb->{next_sync
};
145 return $res if $res != 0;
146 return $joba->{guest
} <=> $jobb->{guest
};
149 foreach my $jobid (sort $sort_func keys %$jobs) {
150 my $jobcfg = $jobs->{$jobid};
151 next if $jobcfg->{state}->{last_iteration
} >= $iteration;
152 if ($jobcfg->{next_sync
} && ($start_time >= $jobcfg->{next_sync
})) {
153 $next_jobid = $jobid;
158 return undef if !$next_jobid;
160 my $jobcfg = $jobs->{$next_jobid};
162 $jobcfg->{state}->{last_iteration
} = $iteration;
163 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
168 sub replication_snapshot_name
{
169 my ($jobid, $last_sync) = @_;
171 my $prefix = "replicate_${jobid}_";
172 my $snapname = "${prefix}${last_sync}_snap";
174 wantarray ?
($prefix, $snapname) : $snapname;
177 sub remote_prepare_local_job
{
178 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $force) = @_;
180 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
181 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid, $vmid];
182 push @$cmd, @$volumes if scalar(@$volumes);
184 push @$cmd, '--last_sync', $last_sync;
185 push @$cmd, '--force' if $force;
187 my $remote_snapshots;
191 $remote_snapshots = JSON
::decode_json
($line);
194 PVE
::Tools
::run_command
($cmd, outfunc
=> $parser);
196 die "prepare remote node failed - no result\n"
197 if !defined($remote_snapshots);
199 return $remote_snapshots;
202 sub remote_finalize_local_job
{
203 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
205 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
206 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
207 $vmid, @$volumes, '--last_sync', $last_sync];
209 PVE
::Tools
::run_command
($cmd);
213 my ($storecfg, $volids, $jobid, $last_sync, $start_time, $logfunc) = @_;
215 my ($prefix, $snapname) = replication_snapshot_name
($jobid, $last_sync);
217 my $last_snapshots = {};
218 foreach my $volid (@$volids) {
219 my $list = PVE
::Storage
::volume_snapshot_list
($storecfg, $volid, $prefix);
221 foreach my $snap (@$list) {
222 if ($snap eq $snapname) {
223 $last_snapshots->{$volid} = 1;
225 $logfunc->($start_time, "$jobid: delete stale snapshot '$snap' on $volid");
226 PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snap);
231 return $last_snapshots;
234 sub replicate_volume
{
235 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
237 my ($storeid, $volname) = PVE
::Storage
::parse_volume_id
($volid);
239 # fixme: handle $rate, $insecure ??
240 PVE
::Storage
::storage_migrate
($storecfg, $volid, $ssh_info, $storeid, $volname,
241 $base_snapshot, $sync_snapname);
248 my $cfg = PVE
::ReplicationConfig-
>new();
249 delete $cfg->{ids
}->{$jobid};
253 PVE
::ReplicationConfig
::lock($code);
257 my ($jobcfg, $last_sync, $start_time, $logfunc) = @_;
259 $logfunc = sub {} if !$logfunc; # log nothing by default
261 my $local_node = PVE
::INotify
::nodename
();
263 die "not implemented - internal error" if $jobcfg->{type
} ne 'local';
265 my $dc_conf = PVE
::Cluster
::cfs_read_file
('datacenter.cfg');
267 my $migration_network;
268 my $migration_type = 'secure';
269 if (my $mc = $dc_conf->{migration
}) {
270 $migration_network = $mc->{network
};
271 $migration_type = $mc->{type
} if defined($mc->{type
});
274 my $ssh_info = PVE
::Cluster
::get_ssh_info
($jobcfg->{target
}, $migration_network);
276 my $jobid = $jobcfg->{id
};
277 my $storecfg = PVE
::Storage
::config
();
279 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
280 if $start_time <= $last_sync;
282 my $vmid = $jobcfg->{guest
};
283 my $vmtype = $jobcfg->{vmtype
};
290 if ($vmtype eq 'qemu') {
291 $conf = PVE
::QemuConfig-
>load_config($vmid);
292 $running = PVE
::QemuServer
::check_running
($vmid);
293 $qga = PVE
::QemuServer
::qga_check_running
($vmid)
294 if $running && $conf->{agent
};
295 $volumes = PVE
::QemuConfig-
>get_replicatable_volumes($storecfg, $conf);
296 } elsif ($vmtype eq 'lxc') {
297 $conf = PVE
::LXC
::Config-
>load_config($vmid);
298 $running = PVE
::LXC
::check_running
($vmid);
299 $volumes = PVE
::LXC
::Config-
>get_replicatable_volumes($storecfg, $conf);
301 die "internal error";
304 my $sorted_volids = [ sort keys %$volumes ];
306 $logfunc->($start_time, "$jobid: guest => $vmid, type => $vmtype, running => $running");
307 $logfunc->($start_time, "$jobid: volumes => " . join(',', @$sorted_volids));
309 if (my $remove_job = $jobcfg->{remove_job
}) {
311 $logfunc->($start_time, "$jobid: start job removal - mode '${remove_job}'");
313 if ($remove_job eq 'full') {
314 # remove all remote volumes
315 remote_prepare_local_job
($ssh_info, $jobid, $vmid, [], 0, 1);
318 # remove all local replication snapshots (lastsync => 0)
319 prepare
($storecfg, $sorted_volids, $jobid, 0, $start_time, $logfunc);
321 delete_job
($jobid); # update config
322 $logfunc->($start_time, "$jobid: job removed");
327 # prepare remote side
328 my $remote_snapshots = remote_prepare_local_job
(
329 $ssh_info, $jobid, $vmid, $sorted_volids, $last_sync);
331 # test if we have a replication_ snapshot from last sync
332 # and remove all other/stale replication snapshots
333 my $last_sync_snapname = replication_snapshot_name
($jobid, $last_sync);
334 my $sync_snapname = replication_snapshot_name
($jobid, $start_time);
336 my $last_snapshots = prepare
(
337 $storecfg, $sorted_volids, $jobid, $last_sync, $start_time, $logfunc);
339 # freeze filesystem for data consistency
341 $logfunc->($start_time, "$jobid: freeze guest filesystem");
342 PVE
::QemuServer
::vm_mon_cmd
($vmid, "guest-fsfreeze-freeze");
345 # make snapshot of all volumes
346 my $replicate_snapshots = {};
348 foreach my $volid (@$sorted_volids) {
349 $logfunc->($start_time, "$jobid: create snapshot '${sync_snapname}' on $volid");
350 PVE
::Storage
::volume_snapshot
($storecfg, $volid, $sync_snapname);
351 $replicate_snapshots->{$volid} = 1;
356 # unfreeze immediately
358 $logfunc->($start_time, "$jobid: unfreeze guest filesystem");
359 eval { PVE
::QemuServer
::vm_mon_cmd
($vmid, "guest-fsfreeze-thaw"); };
360 warn $@ if $@; # ignore errors here, because we cannot fix it anyways
363 my $cleanup_local_snapshots = sub {
364 my ($volid_hash, $snapname) = @_;
365 foreach my $volid (sort keys %$volid_hash) {
366 $logfunc->($start_time, "$jobid: delete snapshot '$snapname' on $volid");
367 eval { PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snapname, $running); };
373 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
379 my $rate = $jobcfg->{rate
};
380 my $insecure = $migration_type eq 'insecure';
382 foreach my $volid (@$sorted_volids) {
384 if ($last_snapshots->{$volid} && $remote_snapshots->{$volid}) {
385 $logfunc->($start_time, "$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
386 $base_snapname = $last_sync_snapname;
388 $logfunc->($start_time, "$jobid: full sync '$volid' ($sync_snapname)");
390 replicate_volume
($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
396 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
397 # we do not cleanup the remote side here - this is done in
398 # next run of prepare_local_job
402 # remove old snapshots because they are no longer needed
403 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
405 remote_finalize_local_job
($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
410 my $run_replication = sub {
411 my ($stateobj, $jobcfg, $start_time, $logfunc) = @_;
413 my $state = delete $jobcfg->{state};
415 my $t0 = [gettimeofday
];
417 # cleanup stale pid/ptime state
418 foreach my $vmid (keys %$stateobj) {
419 foreach my $tid (keys %{$stateobj->{$vmid}}) {
420 my $state = $stateobj->{$vmid}->{$tid};
421 delete $state->{pid
};
422 delete $state->{ptime
};
427 $state->{ptime
} = PVE
::ProcFSTools
::read_proc_starttime
($state->{pid
});
428 $state->{last_try
} = $start_time;
429 $update_job_state->($stateobj, $jobcfg, $state);
431 $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
434 my $timeout = 2; # do not wait too long - we repeat periodically anyways
435 PVE
::GuestHelpers
::guest_migration_lock
(
436 $jobcfg->{guest
}, $timeout, \
&replicate
,
437 $jobcfg, $state->{last_sync
}, $start_time, $logfunc);
441 $state->{duration
} = tv_interval
($t0);
442 delete $state->{pid
};
443 delete $state->{ptime
};
446 $state->{fail_count
}++;
447 $state->{error
} = "$err";
448 $update_job_state->($stateobj, $jobcfg, $state);
451 $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
456 $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
457 $state->{last_sync
} = $start_time;
458 $state->{fail_count
} = 0;
459 delete $state->{error
};
460 $update_job_state->($stateobj, $jobcfg, $state);
465 my ($jobid, $now, $logfunc) = @_; # passing $now useful for regression testing
467 my $local_node = PVE
::INotify
::nodename
();
472 my $stateobj = $read_state->();
474 my $cfg = PVE
::ReplicationConfig-
>new();
476 my $jobcfg = $cfg->{ids
}->{$jobid};
477 die "no such job '$jobid'\n" if !$jobcfg;
479 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
481 die "job '$jobid' is disabled\n" if $jobcfg->{disable
};
483 my $vms = PVE
::Cluster
::get_vmlist
();
484 my $vmid = $jobcfg->{guest
};
486 die "no such guest '$vmid'\n" if !$vms->{ids
}->{$vmid};
488 die "guest '$vmid' is not on local node\n"
489 if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
491 die "unable to sync to local node\n" if $jobcfg->{target
} eq $local_node;
493 $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
494 $jobcfg->{id
} = $jobid;
495 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
497 $jobcfg->{state}->{last_iteration
} = $now;
498 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
500 $run_replication->($stateobj, $jobcfg, $now, $logfunc);
503 my $res = PVE
::Tools
::lock_file
($state_path, 60, $code);
508 my ($now, $logfunc) = @_; # useful for regression testing
510 my $iteration = $now // time();
513 my $stateobj = $read_state->();
514 my $start_time = $now // time();
516 while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
517 $run_replication->($stateobj, $jobcfg, $start_time, $logfunc);
518 $start_time = $now // time();
522 my $res = PVE
::Tools
::lock_file
($state_path, 60, $code);