1 package PVE
::Replication
;
7 use Time
::HiRes
qw(gettimeofday tv_interval);
8 use POSIX
qw(strftime);
15 use PVE
::GuestHelpers
;
16 use PVE
::ReplicationConfig
;
17 use PVE
::ReplicationState
;
20 # regression tests should overwrite this
23 return strftime
("%F %H:%M:%S", localtime);
26 # Find common base replication snapshot, available on local and remote side.
27 # Note: this also removes stale replication snapshots
28 sub find_common_replication_snapshot
{
29 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
31 my $last_sync_snapname =
32 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $last_sync);
34 # test if we have a replication_ snapshot from last sync
35 # and remove all other/stale replication snapshots
37 my $last_snapshots = prepare
(
38 $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
41 my $remote_snapshots = remote_prepare_local_job
(
42 $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
44 my $base_snapshots = {};
46 foreach my $volid (@$volumes) {
49 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
50 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
51 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
52 $base_snapshots->{$volid} = $last_sync_snapname;
53 } elsif (defined($parent_snapname) &&
54 ($last_snapshots->{$volid}->{$parent_snapname} &&
55 $remote_snapshots->{$volid}->{$parent_snapname})) {
56 $base_snapshots->{$volid} = $parent_snapname;
61 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
64 sub remote_prepare_local_job
{
65 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
67 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
68 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
69 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
70 push @$cmd, @$volumes if scalar(@$volumes);
72 push @$cmd, '--last_sync', $last_sync;
73 push @$cmd, '--parent_snapname', $parent_snapname
75 push @$cmd, '--force' if $force;
81 $remote_snapshots = JSON
::decode_json
($line);
87 $logfunc->("(remote_prepare_local_job) $line");
90 PVE
::Tools
::run_command
($cmd, outfunc
=> $parser, errfunc
=> $logger);
92 die "prepare remote node failed - no result\n"
93 if !defined($remote_snapshots);
95 return $remote_snapshots;
98 sub remote_finalize_local_job
{
99 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
101 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
102 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
103 @$volumes, '--last_sync', $last_sync];
108 $logfunc->("(remote_finalize_local_job) $line");
111 PVE
::Tools
::run_command
($cmd, outfunc
=> $logger, errfunc
=> $logger);
114 # finds local replication snapshots from $last_sync
115 # and removes all replication snapshots with other time stamps
117 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
121 my ($prefix, $snapname);
123 if (defined($jobid)) {
124 ($prefix, $snapname) = PVE
::ReplicationState
::replication_snapshot_name
($jobid, $last_sync);
126 $prefix = '__replicate_';
129 my $last_snapshots = {};
130 my $cleaned_replicated_volumes = {};
131 foreach my $volid (@$volids) {
132 my $list = PVE
::Storage
::volume_snapshot_list
($storecfg, $volid);
133 foreach my $snap (@$list) {
134 if ((defined($snapname) && ($snap eq $snapname)) ||
135 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
136 $last_snapshots->{$volid}->{$snap} = 1;
137 } elsif ($snap =~ m/^\Q$prefix\E/) {
138 $logfunc->("delete stale replication snapshot '$snap' on $volid");
141 PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snap);
142 $cleaned_replicated_volumes->{$volid} = 1;
145 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
146 # The likelihood that the delete has worked out is high at a timeout.
147 # If it really fails, it will try to remove on the next run.
149 # warn is for syslog/journal.
152 # logfunc will written in replication log.
153 $logfunc->("delete stale replication snapshot error: $err");
159 return wantarray ?
($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
162 sub replicate_volume
{
163 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
165 my ($storeid, $volname) = PVE
::Storage
::parse_volume_id
($volid);
167 my $ratelimit_bps = int(1000000*$rate) if $rate;
168 PVE
::Storage
::storage_migrate
($storecfg, $volid, $ssh_info, $storeid, $volname,
169 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
174 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
176 my $local_node = PVE
::INotify
::nodename
();
178 die "not implemented - internal error" if $jobcfg->{type
} ne 'local';
180 my $dc_conf = PVE
::Cluster
::cfs_read_file
('datacenter.cfg');
182 my $migration_network;
183 my $migration_type = 'secure';
184 if (my $mc = $dc_conf->{migration
}) {
185 $migration_network = $mc->{network
};
186 $migration_type = $mc->{type
} if defined($mc->{type
});
189 my $jobid = $jobcfg->{id
};
190 my $storecfg = PVE
::Storage
::config
();
191 my $last_sync = $state->{last_sync
};
193 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
194 if $start_time <= $last_sync;
196 my $vmid = $jobcfg->{guest
};
198 my $conf = $guest_class->load_config($vmid);
199 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
200 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job
}));
202 my $sorted_volids = [ sort keys %$volumes ];
204 $running //= 0; # to avoid undef warnings from logfunc
206 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
208 $logfunc->("guest => $guest_name, running => $running");
209 $logfunc->("volumes => " . join(',', @$sorted_volids));
211 if (my $remove_job = $jobcfg->{remove_job
}) {
213 $logfunc->("start job removal - mode '${remove_job}'");
215 if ($remove_job eq 'full' && $jobcfg->{target
} ne $local_node) {
216 # remove all remote volumes
217 my @store_list = map { (PVE
::Storage
::parse_volume_id
($_))[0] } @$sorted_volids;
219 my %hash = map { $_ => 1 } @store_list;
221 my $ssh_info = PVE
::Cluster
::get_ssh_info
($jobcfg->{target
});
223 remote_prepare_local_job
($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
226 # remove all local replication snapshots (lastsync => 0)
227 prepare
($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
229 PVE
::ReplicationConfig
::delete_job
($jobid); # update config
230 $logfunc->("job removed");
235 my $ssh_info = PVE
::Cluster
::get_ssh_info
($jobcfg->{target
}, $migration_network);
237 my $parent_snapname = $conf->{parent
};
239 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot
(
240 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list
}, $last_sync, $parent_snapname, $logfunc);
242 my $storeid_hash = {};
243 foreach my $volid (@$sorted_volids) {
244 my ($storeid) = PVE
::Storage
::parse_volume_id
($volid);
245 $storeid_hash->{$storeid} = 1;
247 $state->{storeid_list
} = [ sort keys %$storeid_hash ];
249 # freeze filesystem for data consistency
251 $logfunc->("freeze guest filesystem");
252 $guest_class->__snapshot_freeze($vmid, 0);
255 # make snapshot of all volumes
257 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $start_time);
259 my $replicate_snapshots = {};
261 foreach my $volid (@$sorted_volids) {
262 $logfunc->("create snapshot '${sync_snapname}' on $volid");
263 PVE
::Storage
::volume_snapshot
($storecfg, $volid, $sync_snapname);
264 $replicate_snapshots->{$volid} = 1;
271 $logfunc->("thaw guest filesystem");
272 $guest_class->__snapshot_freeze($vmid, 1);
275 my $cleanup_local_snapshots = sub {
276 my ($volid_hash, $snapname) = @_;
277 foreach my $volid (sort keys %$volid_hash) {
278 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
279 eval { PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snapname); };
285 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
291 my $rate = $jobcfg->{rate
};
292 my $insecure = $migration_type eq 'insecure';
294 foreach my $volid (@$sorted_volids) {
297 if (defined($base_snapname = $base_snapshots->{$volid})) {
298 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
300 $logfunc->("full sync '$volid' ($sync_snapname)");
303 replicate_volume
($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
308 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
309 # we do not cleanup the remote side here - this is done in
310 # next run of prepare_local_job
314 # remove old snapshots because they are no longer needed
315 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
318 remote_finalize_local_job
($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
321 # old snapshots will removed by next run from prepare_local_job.
323 # warn is for syslog/journal.
326 # logfunc will written in replication log.
327 $logfunc->("delete stale replication snapshot error: $err");
333 my $run_replication_nolock = sub {
334 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
336 my $jobid = $jobcfg->{id
};
340 # we normaly write errors into the state file,
341 # but we also catch unexpected errors and log them to syslog
342 # (for examply when there are problems writing the state file)
344 my $state = PVE
::ReplicationState
::read_job_state
($jobcfg);
346 PVE
::ReplicationState
::record_job_start
($jobcfg, $state, $start_time, $iteration);
348 my $t0 = [gettimeofday
];
350 mkdir $PVE::ReplicationState
::replicate_logdir
;
351 my $logfile = PVE
::ReplicationState
::job_logfile_name
($jobid);
352 open(my $logfd, '>', $logfile) ||
353 die "unable to open replication log '$logfile' - $!\n";
355 my $logfunc_wrapper = sub {
358 my $ctime = get_log_time
();
359 print $logfd "$ctime $jobid: $msg\n";
362 $logfunc->("$ctime $jobid: $msg");
369 $logfunc_wrapper->("start replication job");
372 $volumes = replicate
($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
377 my $msg = "end replication job with error: $err";
379 $logfunc_wrapper->($msg);
381 $logfunc_wrapper->("end replication job");
384 PVE
::ReplicationState
::record_job_end
($jobcfg, $state, $start_time, tv_interval
($t0), $err);
393 sub run_replication
{
394 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
398 my $timeout = 2; # do not wait too long - we repeat periodically anyways
399 $volumes = PVE
::GuestHelpers
::guest_migration_lock
(
400 $jobcfg->{guest
}, $timeout, $run_replication_nolock,
401 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);