1 package PVE
::Replication
;
7 use Time
::HiRes
qw(gettimeofday tv_interval);
12 use PVE
::CalendarEvent
;
14 use PVE
::AbstractConfig
;
20 use PVE
::GuestHelpers
;
21 use PVE
::ReplicationConfig
;
23 # Note: regression tests can overwrite $state_path for testing
24 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
26 my $update_job_state = sub {
27 my ($stateobj, $jobcfg, $state) = @_;
29 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
31 my $vmid = $jobcfg->{guest
};
32 my $tid = $plugin->get_unique_target_id($jobcfg);
34 # Note: tuple ($vmid, $tid) is unique
35 $stateobj->{$vmid}->{$tid} = $state;
37 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
40 my $get_job_state = sub {
41 my ($stateobj, $jobcfg) = @_;
43 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
45 my $vmid = $jobcfg->{guest
};
46 my $tid = $plugin->get_unique_target_id($jobcfg);
47 my $state = $stateobj->{$vmid}->{$tid};
49 $state = {} if !$state;
51 $state->{last_iteration
} //= 0;
52 $state->{last_try
} //= 0; # last sync start time
53 $state->{last_sync
} //= 0; # last successful sync start time
54 $state->{fail_count
} //= 0;
59 my $read_state = sub {
61 return {} if ! -e
$state_path;
63 my $raw = PVE
::Tools
::file_get_contents
($state_path);
65 return {} if $raw eq '';
67 return decode_json
($raw);
73 my $local_node = PVE
::INotify
::nodename
();
77 $stateobj = $read_state->() if !$stateobj;
79 my $cfg = PVE
::ReplicationConfig-
>new();
81 my $vms = PVE
::Cluster
::get_vmlist
();
83 foreach my $jobid (sort keys %{$cfg->{ids
}}) {
84 my $jobcfg = $cfg->{ids
}->{$jobid};
85 my $vmid = $jobcfg->{guest
};
87 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
89 # skip non existing vms
90 next if !$vms->{ids
}->{$vmid};
92 # only consider guest on local node
93 next if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
95 # never sync to local node
96 next if $jobcfg->{target
} eq $local_node;
98 next if $jobcfg->{disable
};
100 my $state = $get_job_state->($stateobj, $jobcfg);
101 $jobcfg->{state} = $state;
102 $jobcfg->{id
} = $jobid;
103 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
106 if (my $fail_count = $state->{fail_count
}) {
107 if ($fail_count < 3) {
108 $next_sync = $state->{last_try
} + 5*60*$fail_count;
111 my $schedule = $jobcfg->{schedule
} || '*/15';
112 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
113 $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $state->{last_try
}) // 0;
115 $jobcfg->{next_sync
} = $next_sync;
117 $jobs->{$jobid} = $jobcfg;
123 my $get_next_job = sub {
124 my ($stateobj, $iteration, $start_time) = @_;
128 my $jobs = job_status
($stateobj);
130 my $sort_func = sub {
131 my $joba = $jobs->{$a};
132 my $jobb = $jobs->{$b};
133 my $sa = $joba->{state};
134 my $sb = $jobb->{state};
135 my $res = $sa->{last_iteration
} cmp $sb->{last_iteration
};
136 return $res if $res != 0;
137 $res = $joba->{next_sync
} <=> $jobb->{next_sync
};
138 return $res if $res != 0;
139 return $joba->{guest
} <=> $jobb->{guest
};
142 foreach my $jobid (sort $sort_func keys %$jobs) {
143 my $jobcfg = $jobs->{$jobid};
144 next if $jobcfg->{state}->{last_iteration
} >= $iteration;
145 if ($jobcfg->{next_sync
} && ($start_time >= $jobcfg->{next_sync
})) {
146 $next_jobid = $jobid;
151 return undef if !$next_jobid;
153 my $jobcfg = $jobs->{$next_jobid};
155 $jobcfg->{state}->{last_iteration
} = $iteration;
156 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
161 sub replication_snapshot_name
{
162 my ($jobid, $last_sync) = @_;
164 my $prefix = "replicate_${jobid}_";
165 my $snapname = "${prefix}${last_sync}_snap";
167 wantarray ?
($prefix, $snapname) : $snapname;
170 sub remote_prepare_local_job
{
171 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
173 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
174 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid,
175 $vmid, @$volumes, '--last_sync', $last_sync];
177 my $remote_snapshots;
181 $remote_snapshots = JSON
::decode_json
($line);
184 PVE
::Tools
::run_command
($cmd, outfunc
=> $parser);
186 die "prepare remote node failed - no result\n"
187 if !defined($remote_snapshots);
189 return $remote_snapshots;
192 sub remote_finalize_local_job
{
193 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
195 my $ssh_cmd = PVE
::Cluster
::ssh_info_to_command
($ssh_info);
196 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
197 $vmid, @$volumes, '--last_sync', $last_sync];
199 PVE
::Tools
::run_command
($cmd);
203 my ($storecfg, $volids, $jobid, $last_sync, $start_time, $logfunc) = @_;
205 my ($prefix, $snapname) = replication_snapshot_name
($jobid, $last_sync);
207 my $last_snapshots = {};
208 foreach my $volid (@$volids) {
209 my $list = PVE
::Storage
::volume_snapshot_list
($storecfg, $volid, $prefix);
211 foreach my $snap (@$list) {
212 if ($snap eq $snapname) {
213 $last_snapshots->{$volid} = 1;
215 $logfunc->($start_time, "$jobid: delete stale snapshot '$snap' on $volid");
216 PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snap);
221 return $last_snapshots;
224 sub replicate_volume
{
225 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
227 my ($storeid, $volname) = PVE
::Storage
::parse_volume_id
($volid);
229 # fixme: handle $rate, $insecure ??
230 PVE
::Storage
::storage_migrate
($storecfg, $volid, $ssh_info, $storeid, $volname,
231 $base_snapshot, $sync_snapname);
235 my ($jobcfg, $last_sync, $start_time, $logfunc) = @_;
237 $logfunc = sub {} if !$logfunc; # log nothing by default
239 my $local_node = PVE
::INotify
::nodename
();
241 die "not implemented - internal error" if $jobcfg->{type
} ne 'local';
243 my $dc_conf = PVE
::Cluster
::cfs_read_file
('datacenter.cfg');
245 my $migration_network;
246 my $migration_type = 'secure';
247 if (my $mc = $dc_conf->{migration
}) {
248 $migration_network = $mc->{network
};
249 $migration_type = $mc->{type
} if defined($mc->{type
});
252 my $ssh_info = PVE
::Cluster
::get_ssh_info
($jobcfg->{target
}, $migration_network);
254 my $jobid = $jobcfg->{id
};
255 my $storecfg = PVE
::Storage
::config
();
257 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
258 if $start_time <= $last_sync;
260 my $vmid = $jobcfg->{guest
};
261 my $vmtype = $jobcfg->{vmtype
};
268 if ($vmtype eq 'qemu') {
269 $conf = PVE
::QemuConfig-
>load_config($vmid);
270 $running = PVE
::QemuServer
::check_running
($vmid);
271 $qga = PVE
::QemuServer
::qga_check_running
($vmid)
272 if $running && $conf->{agent
};
273 $volumes = PVE
::QemuConfig-
>get_replicatable_volumes($storecfg, $conf);
274 } elsif ($vmtype eq 'lxc') {
275 $conf = PVE
::LXC
::Config-
>load_config($vmid);
276 $running = PVE
::LXC
::check_running
($vmid);
277 $volumes = PVE
::LXC
::Config-
>get_replicatable_volumes($storecfg, $conf);
279 die "internal error";
282 my $sorted_volids = [ sort keys %$volumes ];
284 $logfunc->($start_time, "$jobid: guest => $vmid, type => $vmtype, running => $running");
285 $logfunc->($start_time, "$jobid: volumes => " . join(',', @$sorted_volids));
287 # prepare remote side
288 my $remote_snapshots = remote_prepare_local_job
(
289 $ssh_info, $jobid, $vmid, $volumes, $last_sync);
291 # test if we have a replication_ snapshot from last sync
292 # and remove all other/stale replication snapshots
293 my $last_sync_snapname = replication_snapshot_name
($jobid, $last_sync);
294 my $sync_snapname = replication_snapshot_name
($jobid, $start_time);
296 my $last_snapshots = prepare
(
297 $storecfg, $sorted_volids, $jobid, $last_sync, $start_time, $logfunc);
299 # freeze filesystem for data consistency
301 $logfunc->($start_time, "$jobid: freeze guest filesystem");
302 PVE
::QemuServer
::vm_mon_cmd
($vmid, "guest-fsfreeze-freeze");
305 # make snapshot of all volumes
306 my $replicate_snapshots = {};
308 foreach my $volid (@$sorted_volids) {
309 $logfunc->($start_time, "$jobid: create snapshot '${sync_snapname}' on $volid");
310 PVE
::Storage
::volume_snapshot
($storecfg, $volid, $sync_snapname);
311 $replicate_snapshots->{$volid} = 1;
316 # unfreeze immediately
318 $logfunc->($start_time, "$jobid: unfreeze guest filesystem");
319 eval { PVE
::QemuServer
::vm_mon_cmd
($vmid, "guest-fsfreeze-thaw"); };
320 warn $@ if $@; # ignore errors here, because we cannot fix it anyways
323 my $cleanup_local_snapshots = sub {
324 my ($volid_hash, $snapname) = @_;
325 foreach my $volid (sort keys %$volid_hash) {
326 $logfunc->($start_time, "$jobid: delete snapshot '$snapname' on $volid");
327 eval { PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snapname, $running); };
333 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
339 my $rate = $jobcfg->{rate
};
340 my $insecure = $migration_type eq 'insecure';
342 foreach my $volid (@$sorted_volids) {
344 if ($last_snapshots->{$volid} && $remote_snapshots->{$volid}) {
345 $logfunc->($start_time, "$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
346 $base_snapname = $last_sync_snapname;
348 $logfunc->($start_time, "$jobid: full sync '$volid' ($sync_snapname)");
350 replicate_volume
($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
356 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
357 # we do not cleanup the remote side here - this is done in
358 # next run of prepare_local_job
362 # remove old snapshots because they are no longer needed
363 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
365 remote_finalize_local_job
($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
370 my $run_replication = sub {
371 my ($stateobj, $jobcfg, $start_time, $logfunc) = @_;
373 my $state = delete $jobcfg->{state};
375 my $t0 = [gettimeofday
];
377 # cleanup stale pid/ptime state
378 foreach my $vmid (keys %$stateobj) {
379 foreach my $tid (keys %{$stateobj->{$vmid}}) {
380 my $state = $stateobj->{$vmid}->{$tid};
381 delete $state->{pid
};
382 delete $state->{ptime
};
387 $state->{ptime
} = PVE
::ProcFSTools
::read_proc_starttime
($state->{pid
});
388 $state->{last_try
} = $start_time;
389 $update_job_state->($stateobj, $jobcfg, $state);
391 $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
394 my $timeout = 2; # do not wait too long - we repeat periodically anyways
395 PVE
::GuestHelpers
::guest_migration_lock
(
396 $jobcfg->{guest
}, $timeout, \
&replicate
,
397 $jobcfg, $state->{last_sync
}, $start_time, $logfunc);
401 $state->{duration
} = tv_interval
($t0);
402 delete $state->{pid
};
403 delete $state->{ptime
};
406 $state->{fail_count
}++;
407 $state->{error
} = "$err";
408 $update_job_state->($stateobj, $jobcfg, $state);
411 $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
416 $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
417 $state->{last_sync
} = $start_time;
418 $state->{fail_count
} = 0;
419 delete $state->{error
};
420 $update_job_state->($stateobj, $jobcfg, $state);
425 my ($jobid, $now, $logfunc) = @_; # passing $now useful for regression testing
427 my $local_node = PVE
::INotify
::nodename
();
432 my $stateobj = $read_state->();
434 my $cfg = PVE
::ReplicationConfig-
>new();
436 my $jobcfg = $cfg->{ids
}->{$jobid};
437 die "no such job '$jobid'\n" if !$jobcfg;
439 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
441 die "job '$jobid' is disabled\n" if $jobcfg->{disable
};
443 my $vms = PVE
::Cluster
::get_vmlist
();
444 my $vmid = $jobcfg->{guest
};
446 die "no such guest '$vmid'\n" if !$vms->{ids
}->{$vmid};
448 die "guest '$vmid' is not on local node\n"
449 if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
451 die "unable to sync to local node\n" if $jobcfg->{target
} eq $local_node;
453 $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
454 $jobcfg->{id
} = $jobid;
455 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
457 $jobcfg->{state}->{last_iteration
} = $now;
458 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
460 $run_replication->($stateobj, $jobcfg, $now, $logfunc);
463 my $res = PVE
::Tools
::lock_file
($state_path, 60, $code);
468 my ($now, $logfunc) = @_; # useful for regression testing
470 my $iteration = $now // time();
473 my $stateobj = $read_state->();
474 my $start_time = $now // time();
476 while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
477 $run_replication->($stateobj, $jobcfg, $start_time, $logfunc);
478 $start_time = $now // time();
482 my $res = PVE
::Tools
::lock_file
($state_path, 60, $code);