1 package PVE
::Replication
;
7 use Time
::HiRes
qw(gettimeofday tv_interval);
12 use PVE
::CalendarEvent
;
14 use PVE
::AbstractConfig
;
16 use PVE
::GuestHelpers
;
17 use PVE
::ReplicationConfig
;
18 use PVE
::ReplicationState
;
20 our $replicate_logdir = "/var/log/pve/replicate";
22 # regression tests should overwrite this
23 sub job_logfile_name
{
26 return "${replicate_logdir}/$jobid";
29 # regression tests should overwrite this
37 my $local_node = PVE
::INotify
::nodename
();
41 my $stateobj = PVE
::ReplicationState
::read_state
();
43 my $cfg = PVE
::ReplicationConfig-
>new();
45 my $vms = PVE
::Cluster
::get_vmlist
();
47 foreach my $jobid (sort keys %{$cfg->{ids
}}) {
48 my $jobcfg = $cfg->{ids
}->{$jobid};
49 my $vmid = $jobcfg->{guest
};
51 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
53 # skip non existing vms
54 next if !$vms->{ids
}->{$vmid};
56 # only consider guest on local node
57 next if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
59 if (!$jobcfg->{remove_job
}) {
60 # never sync to local node
61 next if $jobcfg->{target
} eq $local_node;
63 next if $jobcfg->{disable
};
66 my $state = PVE
::ReplicationState
::extract_job_state
($stateobj, $jobcfg);
67 $jobcfg->{state} = $state;
68 $jobcfg->{id
} = $jobid;
69 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
73 if ($jobcfg->{remove_job
}) {
74 $next_sync = 1; # lowest possible value
75 # todo: consider fail_count? How many retries?
77 if (my $fail_count = $state->{fail_count
}) {
78 if ($fail_count < 3) {
79 $next_sync = $state->{last_try
} + 5*60*$fail_count;
82 my $schedule = $jobcfg->{schedule
} || '*/15';
83 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
84 $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $state->{last_try
}) // 0;
88 $jobcfg->{next_sync
} = $next_sync;
90 $jobs->{$jobid} = $jobcfg;
97 my ($iteration, $start_time) = @_;
99 my $jobs = job_status
();
101 my $sort_func = sub {
102 my $joba = $jobs->{$a};
103 my $jobb = $jobs->{$b};
104 my $sa = $joba->{state};
105 my $sb = $jobb->{state};
106 my $res = $sa->{last_iteration
} cmp $sb->{last_iteration
};
107 return $res if $res != 0;
108 $res = $joba->{next_sync
} <=> $jobb->{next_sync
};
109 return $res if $res != 0;
110 return $joba->{guest
} <=> $jobb->{guest
};
113 foreach my $jobid (sort $sort_func keys %$jobs) {
114 my $jobcfg = $jobs->{$jobid};
115 next if $jobcfg->{state}->{last_iteration
} >= $iteration;
116 if ($jobcfg->{next_sync
} && ($start_time >= $jobcfg->{next_sync
})) {
124 sub remote_prepare_local_job
{
125 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
127 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
128 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
129 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
130 push @$cmd, @$volumes if scalar(@$volumes);
132 push @$cmd, '--last_sync', $last_sync;
133 push @$cmd, '--parent_snapname', $parent_snapname
135 push @$cmd, '--force' if $force;
137 my $remote_snapshots;
141 $remote_snapshots = JSON
::decode_json
($line);
147 $logfunc->("(remote_prepare_local_job) $line");
150 PVE
::Tools
::run_command
($cmd, outfunc
=> $parser, errfunc
=> $logger);
152 die "prepare remote node failed - no result\n"
153 if !defined($remote_snapshots);
155 return $remote_snapshots;
158 sub remote_finalize_local_job
{
159 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
161 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
162 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
163 @$volumes, '--last_sync', $last_sync];
168 $logfunc->("(remote_finalize_local_job) $line");
171 PVE
::Tools
::run_command
($cmd, outfunc
=> $logger, errfunc
=> $logger);
174 # finds local replication snapshots from $last_sync
175 # and removes all replication snapshots with other time stamps
177 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
181 my ($prefix, $snapname) =
182 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $last_sync);
184 my $last_snapshots = {};
185 my $cleaned_replicated_volumes = {};
186 foreach my $volid (@$volids) {
187 my $list = PVE
::Storage
::volume_snapshot_list
($storecfg, $volid);
188 foreach my $snap (@$list) {
189 if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
190 $last_snapshots->{$volid}->{$snap} = 1;
191 } elsif ($snap =~ m/^\Q$prefix\E/) {
192 $logfunc->("delete stale replication snapshot '$snap' on $volid");
193 PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snap);
194 $cleaned_replicated_volumes->{$volid} = 1;
199 return wantarray ?
($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
202 sub replicate_volume
{
203 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
205 my ($storeid, $volname) = PVE
::Storage
::parse_volume_id
($volid);
207 # fixme: handle $rate, $insecure ??
208 PVE
::Storage
::storage_migrate
($storecfg, $volid, $ssh_info, $storeid, $volname,
209 $base_snapshot, $sync_snapname);
216 my $cfg = PVE
::ReplicationConfig-
>new();
217 delete $cfg->{ids
}->{$jobid};
221 PVE
::ReplicationConfig
::lock($code);
225 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
227 my $local_node = PVE
::INotify
::nodename
();
229 die "not implemented - internal error" if $jobcfg->{type
} ne 'local';
231 my $dc_conf = PVE
::Cluster
::cfs_read_file
('datacenter.cfg');
233 my $migration_network;
234 my $migration_type = 'secure';
235 if (my $mc = $dc_conf->{migration
}) {
236 $migration_network = $mc->{network
};
237 $migration_type = $mc->{type
} if defined($mc->{type
});
240 my $jobid = $jobcfg->{id
};
241 my $storecfg = PVE
::Storage
::config
();
242 my $last_sync = $state->{last_sync
};
244 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
245 if $start_time <= $last_sync;
247 my $vmid = $jobcfg->{guest
};
248 my $vmtype = $jobcfg->{vmtype
};
250 my $conf = $guest_class->load_config($vmid);
251 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
252 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf);
254 my $sorted_volids = [ sort keys %$volumes ];
256 $running //= 0; # to avoid undef warnings from logfunc
258 $logfunc->("guest => $vmid, type => $vmtype, running => $running");
259 $logfunc->("volumes => " . join(',', @$sorted_volids));
261 if (my $remove_job = $jobcfg->{remove_job
}) {
263 $logfunc->("start job removal - mode '${remove_job}'");
265 if ($remove_job eq 'full' && $jobcfg->{target
} ne $local_node) {
266 # remove all remote volumes
267 my $ssh_info = PVE
::Cluster
::get_ssh_info
($jobcfg->{target
});
268 remote_prepare_local_job
($ssh_info, $jobid, $vmid, [], $state->{storeid_list
}, 0, undef, 1, $logfunc);
271 # remove all local replication snapshots (lastsync => 0)
272 prepare
($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
274 delete_job
($jobid); # update config
275 $logfunc->("job removed");
280 my $ssh_info = PVE
::Cluster
::get_ssh_info
($jobcfg->{target
}, $migration_network);
282 my $last_sync_snapname =
283 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $last_sync);
285 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $start_time);
287 my $parent_snapname = $conf->{parent
};
289 # test if we have a replication_ snapshot from last sync
290 # and remove all other/stale replication snapshots
292 my $last_snapshots = prepare
(
293 $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
295 # prepare remote side
296 my $remote_snapshots = remote_prepare_local_job
(
297 $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list
}, $last_sync, $parent_snapname, 0, $logfunc);
299 my $storeid_hash = {};
300 foreach my $volid (@$sorted_volids) {
301 my ($storeid) = PVE
::Storage
::parse_volume_id
($volid);
302 $storeid_hash->{$storeid} = 1;
304 $state->{storeid_list
} = [ sort keys %$storeid_hash ];
306 # freeze filesystem for data consistency
308 $logfunc->("freeze guest filesystem");
309 $guest_class->__snapshot_freeze($vmid, 0);
312 # make snapshot of all volumes
313 my $replicate_snapshots = {};
315 foreach my $volid (@$sorted_volids) {
316 $logfunc->("create snapshot '${sync_snapname}' on $volid");
317 PVE
::Storage
::volume_snapshot
($storecfg, $volid, $sync_snapname);
318 $replicate_snapshots->{$volid} = 1;
323 # unfreeze immediately
325 $guest_class->__snapshot_freeze($vmid, 1);
328 my $cleanup_local_snapshots = sub {
329 my ($volid_hash, $snapname) = @_;
330 foreach my $volid (sort keys %$volid_hash) {
331 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
332 eval { PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snapname); };
338 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
344 my $rate = $jobcfg->{rate
};
345 my $insecure = $migration_type eq 'insecure';
347 foreach my $volid (@$sorted_volids) {
350 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
351 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
352 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
353 $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
354 $base_snapname = $last_sync_snapname;
355 } elsif (defined($parent_snapname) &&
356 ($last_snapshots->{$volid}->{$parent_snapname} &&
357 $remote_snapshots->{$volid}->{$parent_snapname})) {
358 $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
359 $base_snapname = $parent_snapname;
363 $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
364 replicate_volume
($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
370 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
371 # we do not cleanup the remote side here - this is done in
372 # next run of prepare_local_job
376 # remove old snapshots because they are no longer needed
377 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
379 remote_finalize_local_job
($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
384 my $run_replication_nolock = sub {
385 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
387 my $jobid = $jobcfg->{id
};
389 # we normaly write errors into the state file,
390 # but we also catch unexpected errors and log them to syslog
391 # (for examply when there are problems writing the state file)
393 my $state = PVE
::ReplicationState
::read_job_state
($jobcfg);
395 my $t0 = [gettimeofday
];
398 $state->{ptime
} = PVE
::ProcFSTools
::read_proc_starttime
($state->{pid
});
399 $state->{last_node
} = PVE
::INotify
::nodename
();
400 $state->{last_try
} = $start_time;
401 $state->{last_iteration
} = $iteration;
402 $state->{storeid_list
} //= [];
404 PVE
::ReplicationState
::write_job_state
($jobcfg, $state);
406 mkdir $replicate_logdir;
407 my $logfile = job_logfile_name
($jobid);
408 open(my $logfd, '>', $logfile) ||
409 die "unable to open replication log '$logfile' - $!\n";
411 my $logfunc_wrapper = sub {
414 my $ctime = get_log_time
();
415 print $logfd "$ctime $jobid: $msg\n";
416 $logfunc->("$ctime $jobid: $msg") if $logfunc;
419 $logfunc_wrapper->("start replication job");
422 replicate
($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
426 $state->{duration
} = tv_interval
($t0);
427 delete $state->{pid
};
428 delete $state->{ptime
};
432 $state->{fail_count
}++;
433 $state->{error
} = "$err";
434 PVE
::ReplicationState
::write_job_state
($jobcfg, $state);
435 $logfunc_wrapper->("end replication job with error: $err");
437 $logfunc_wrapper->("end replication job");
438 $state->{last_sync
} = $start_time;
439 $state->{fail_count
} = 0;
440 delete $state->{error
};
441 PVE
::ReplicationState
::write_job_state
($jobcfg, $state);
447 warn "$jobid: got unexpected replication job error - $err";
451 sub run_replication
{
452 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
455 my $timeout = 2; # do not wait too long - we repeat periodically anyways
456 PVE
::GuestHelpers
::guest_migration_lock
(
457 $jobcfg->{guest
}, $timeout, $run_replication_nolock,
458 $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
461 return undef if $noerr;