]> git.proxmox.com Git - pve-manager.git/blob - PVE/Replication.pm
PVE::Replication - remove dependency to PVE::LXC/PVE::QemuServer
[pve-manager.git] / PVE / Replication.pm
1 package PVE::Replication;
2
3 use warnings;
4 use strict;
5 use Data::Dumper;
6 use JSON;
7 use Time::HiRes qw(gettimeofday tv_interval);
8
9 use PVE::INotify;
10 use PVE::ProcFSTools;
11 use PVE::Tools;
12 use PVE::CalendarEvent;
13 use PVE::Cluster;
14 use PVE::AbstractConfig;
15 use PVE::Storage;
16 use PVE::GuestHelpers;
17 use PVE::ReplicationConfig;
18 use PVE::ReplicationState;
19
20 our $replicate_logdir = "/var/log/pve/replicate";
21
22 # regression tests should overwrite this
23 sub job_logfile_name {
24 my ($jobid) = @_;
25
26 return "${replicate_logdir}/$jobid";
27 }
28
29 # regression tests should overwrite this
30 sub get_log_time {
31
32 return time();
33 }
34
35 sub job_status {
36
37 my $local_node = PVE::INotify::nodename();
38
39 my $jobs = {};
40
41 my $stateobj = PVE::ReplicationState::read_state();
42
43 my $cfg = PVE::ReplicationConfig->new();
44
45 my $vms = PVE::Cluster::get_vmlist();
46
47 foreach my $jobid (sort keys %{$cfg->{ids}}) {
48 my $jobcfg = $cfg->{ids}->{$jobid};
49 my $vmid = $jobcfg->{guest};
50
51 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
52
53 # skip non existing vms
54 next if !$vms->{ids}->{$vmid};
55
56 # only consider guest on local node
57 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
58
59 if (!$jobcfg->{remove_job}) {
60 # never sync to local node
61 next if $jobcfg->{target} eq $local_node;
62
63 next if $jobcfg->{disable};
64 }
65
66 my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
67 $jobcfg->{state} = $state;
68 $jobcfg->{id} = $jobid;
69 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
70
71 my $next_sync = 0;
72
73 if ($jobcfg->{remove_job}) {
74 $next_sync = 1; # lowest possible value
75 # todo: consider fail_count? How many retries?
76 } else {
77 if (my $fail_count = $state->{fail_count}) {
78 if ($fail_count < 3) {
79 $next_sync = $state->{last_try} + 5*60*$fail_count;
80 }
81 } else {
82 my $schedule = $jobcfg->{schedule} || '*/15';
83 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
84 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
85 }
86 }
87
88 $jobcfg->{next_sync} = $next_sync;
89
90 $jobs->{$jobid} = $jobcfg;
91 }
92
93 return $jobs;
94 }
95
96 sub get_next_job {
97 my ($iteration, $start_time) = @_;
98
99 my $jobs = job_status();
100
101 my $sort_func = sub {
102 my $joba = $jobs->{$a};
103 my $jobb = $jobs->{$b};
104 my $sa = $joba->{state};
105 my $sb = $jobb->{state};
106 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
107 return $res if $res != 0;
108 $res = $joba->{next_sync} <=> $jobb->{next_sync};
109 return $res if $res != 0;
110 return $joba->{guest} <=> $jobb->{guest};
111 };
112
113 foreach my $jobid (sort $sort_func keys %$jobs) {
114 my $jobcfg = $jobs->{$jobid};
115 next if $jobcfg->{state}->{last_iteration} >= $iteration;
116 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
117 return $jobcfg;
118 }
119 }
120
121 return undef;
122 }
123
124 sub remote_prepare_local_job {
125 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
126
127 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
128 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
129 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
130 push @$cmd, @$volumes if scalar(@$volumes);
131
132 push @$cmd, '--last_sync', $last_sync;
133 push @$cmd, '--parent_snapname', $parent_snapname
134 if $parent_snapname;
135 push @$cmd, '--force' if $force;
136
137 my $remote_snapshots;
138
139 my $parser = sub {
140 my $line = shift;
141 $remote_snapshots = JSON::decode_json($line);
142 };
143
144 my $logger = sub {
145 my $line = shift;
146 chomp $line;
147 $logfunc->("(remote_prepare_local_job) $line");
148 };
149
150 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
151
152 die "prepare remote node failed - no result\n"
153 if !defined($remote_snapshots);
154
155 return $remote_snapshots;
156 }
157
158 sub remote_finalize_local_job {
159 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
160
161 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
162 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
163 @$volumes, '--last_sync', $last_sync];
164
165 my $logger = sub {
166 my $line = shift;
167 chomp $line;
168 $logfunc->("(remote_finalize_local_job) $line");
169 };
170
171 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
172 }
173
174 # finds local replication snapshots from $last_sync
175 # and removes all replication snapshots with other time stamps
176 sub prepare {
177 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
178
179 $last_sync //= 0;
180
181 my ($prefix, $snapname) =
182 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
183
184 my $last_snapshots = {};
185 my $cleaned_replicated_volumes = {};
186 foreach my $volid (@$volids) {
187 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
188 foreach my $snap (@$list) {
189 if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
190 $last_snapshots->{$volid}->{$snap} = 1;
191 } elsif ($snap =~ m/^\Q$prefix\E/) {
192 $logfunc->("delete stale replication snapshot '$snap' on $volid");
193 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
194 $cleaned_replicated_volumes->{$volid} = 1;
195 }
196 }
197 }
198
199 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
200 }
201
202 sub replicate_volume {
203 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
204
205 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
206
207 # fixme: handle $rate, $insecure ??
208 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
209 $base_snapshot, $sync_snapname);
210 }
211
212 sub delete_job {
213 my ($jobid) = @_;
214
215 my $code = sub {
216 my $cfg = PVE::ReplicationConfig->new();
217 delete $cfg->{ids}->{$jobid};
218 $cfg->write();
219 };
220
221 PVE::ReplicationConfig::lock($code);
222 }
223
224 sub replicate {
225 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
226
227 my $local_node = PVE::INotify::nodename();
228
229 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
230
231 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
232
233 my $migration_network;
234 my $migration_type = 'secure';
235 if (my $mc = $dc_conf->{migration}) {
236 $migration_network = $mc->{network};
237 $migration_type = $mc->{type} if defined($mc->{type});
238 }
239
240 my $jobid = $jobcfg->{id};
241 my $storecfg = PVE::Storage::config();
242 my $last_sync = $state->{last_sync};
243
244 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
245 if $start_time <= $last_sync;
246
247 my $vmid = $jobcfg->{guest};
248 my $vmtype = $jobcfg->{vmtype};
249
250 my $conf = $guest_class->load_config($vmid);
251 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
252 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf);
253
254 my $sorted_volids = [ sort keys %$volumes ];
255
256 $running //= 0; # to avoid undef warnings from logfunc
257
258 $logfunc->("guest => $vmid, type => $vmtype, running => $running");
259 $logfunc->("volumes => " . join(',', @$sorted_volids));
260
261 if (my $remove_job = $jobcfg->{remove_job}) {
262
263 $logfunc->("start job removal - mode '${remove_job}'");
264
265 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
266 # remove all remote volumes
267 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
268 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
269
270 }
271 # remove all local replication snapshots (lastsync => 0)
272 prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
273
274 delete_job($jobid); # update config
275 $logfunc->("job removed");
276
277 return;
278 }
279
280 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
281
282 my $last_sync_snapname =
283 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
284 my $sync_snapname =
285 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
286
287 my $parent_snapname = $conf->{parent};
288
289 # test if we have a replication_ snapshot from last sync
290 # and remove all other/stale replication snapshots
291
292 my $last_snapshots = prepare(
293 $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
294
295 # prepare remote side
296 my $remote_snapshots = remote_prepare_local_job(
297 $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
298
299 my $storeid_hash = {};
300 foreach my $volid (@$sorted_volids) {
301 my ($storeid) = PVE::Storage::parse_volume_id($volid);
302 $storeid_hash->{$storeid} = 1;
303 }
304 $state->{storeid_list} = [ sort keys %$storeid_hash ];
305
306 # freeze filesystem for data consistency
307 if ($freezefs) {
308 $logfunc->("freeze guest filesystem");
309 $guest_class->__snapshot_freeze($vmid, 0);
310 }
311
312 # make snapshot of all volumes
313 my $replicate_snapshots = {};
314 eval {
315 foreach my $volid (@$sorted_volids) {
316 $logfunc->("create snapshot '${sync_snapname}' on $volid");
317 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
318 $replicate_snapshots->{$volid} = 1;
319 }
320 };
321 my $err = $@;
322
323 # unfreeze immediately
324 if ($freezefs) {
325 $guest_class->__snapshot_freeze($vmid, 1);
326 }
327
328 my $cleanup_local_snapshots = sub {
329 my ($volid_hash, $snapname) = @_;
330 foreach my $volid (sort keys %$volid_hash) {
331 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
332 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
333 warn $@ if $@;
334 }
335 };
336
337 if ($err) {
338 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
339 die $err;
340 }
341
342 eval {
343
344 my $rate = $jobcfg->{rate};
345 my $insecure = $migration_type eq 'insecure';
346
347 foreach my $volid (@$sorted_volids) {
348 my $base_snapname;
349
350 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
351 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
352 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
353 $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
354 $base_snapname = $last_sync_snapname;
355 } elsif (defined($parent_snapname) &&
356 ($last_snapshots->{$volid}->{$parent_snapname} &&
357 $remote_snapshots->{$volid}->{$parent_snapname})) {
358 $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
359 $base_snapname = $parent_snapname;
360 }
361 }
362
363 $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
364 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
365 }
366 };
367 $err = $@;
368
369 if ($err) {
370 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
371 # we do not cleanup the remote side here - this is done in
372 # next run of prepare_local_job
373 die $err;
374 }
375
376 # remove old snapshots because they are no longer needed
377 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
378
379 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
380
381 die $err if $err;
382 }
383
384 my $run_replication_nolock = sub {
385 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
386
387 my $jobid = $jobcfg->{id};
388
389 # we normaly write errors into the state file,
390 # but we also catch unexpected errors and log them to syslog
391 # (for examply when there are problems writing the state file)
392 eval {
393 my $state = PVE::ReplicationState::read_job_state($jobcfg);
394
395 my $t0 = [gettimeofday];
396
397 $state->{pid} = $$;
398 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
399 $state->{last_node} = PVE::INotify::nodename();
400 $state->{last_try} = $start_time;
401 $state->{last_iteration} = $iteration;
402 $state->{storeid_list} //= [];
403
404 PVE::ReplicationState::write_job_state($jobcfg, $state);
405
406 mkdir $replicate_logdir;
407 my $logfile = job_logfile_name($jobid);
408 open(my $logfd, '>', $logfile) ||
409 die "unable to open replication log '$logfile' - $!\n";
410
411 my $logfunc_wrapper = sub {
412 my ($msg) = @_;
413
414 my $ctime = get_log_time();
415 print $logfd "$ctime $jobid: $msg\n";
416 $logfunc->("$ctime $jobid: $msg") if $logfunc;
417 };
418
419 $logfunc_wrapper->("start replication job");
420
421 eval {
422 replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
423 };
424 my $err = $@;
425
426 $state->{duration} = tv_interval($t0);
427 delete $state->{pid};
428 delete $state->{ptime};
429
430 if ($err) {
431 chomp $err;
432 $state->{fail_count}++;
433 $state->{error} = "$err";
434 PVE::ReplicationState::write_job_state($jobcfg, $state);
435 $logfunc_wrapper->("end replication job with error: $err");
436 } else {
437 $logfunc_wrapper->("end replication job");
438 $state->{last_sync} = $start_time;
439 $state->{fail_count} = 0;
440 delete $state->{error};
441 PVE::ReplicationState::write_job_state($jobcfg, $state);
442 }
443
444 close($logfd);
445 };
446 if (my $err = $@) {
447 warn "$jobid: got unexpected replication job error - $err";
448 }
449 };
450
451 sub run_replication {
452 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
453
454 eval {
455 my $timeout = 2; # do not wait too long - we repeat periodically anyways
456 PVE::GuestHelpers::guest_migration_lock(
457 $jobcfg->{guest}, $timeout, $run_replication_nolock,
458 $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
459 };
460 if (my $err = $@) {
461 return undef if $noerr;
462 die $err;
463 }
464 }
465
466 1;