]> git.proxmox.com Git - pve-guest-common.git/blob - PVE/Replication.pm
bump version to 5.1.1
[pve-guest-common.git] / PVE / Replication.pm
1 package PVE::Replication;
2
3 use warnings;
4 use strict;
5 use Data::Dumper;
6 use JSON;
7 use Time::HiRes qw(gettimeofday tv_interval);
8 use POSIX qw(strftime);
9
10 use PVE::INotify;
11 use PVE::ProcFSTools;
12 use PVE::Tools;
13 use PVE::Cluster;
14 use PVE::Storage;
15 use PVE::GuestHelpers;
16 use PVE::ReplicationConfig;
17 use PVE::ReplicationState;
18
19
20 # regression tests should overwrite this
21 sub get_log_time {
22
23 return strftime("%F %H:%M:%S", localtime);
24 }
25
26 # Find common base replication snapshot, available on local and remote side.
27 # Note: this also removes stale replication snapshots
28 sub find_common_replication_snapshot {
29 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
30
31 my $last_sync_snapname =
32 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
33
34 # test if we have a replication_ snapshot from last sync
35 # and remove all other/stale replication snapshots
36
37 my $last_snapshots = prepare(
38 $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
39
40 # prepare remote side
41 my $remote_snapshots = remote_prepare_local_job(
42 $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
43
44 my $base_snapshots = {};
45
46 foreach my $volid (@$volumes) {
47 my $base_snapname;
48
49 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
50 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
51 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
52 $base_snapshots->{$volid} = $last_sync_snapname;
53 } elsif (defined($parent_snapname) &&
54 ($last_snapshots->{$volid}->{$parent_snapname} &&
55 $remote_snapshots->{$volid}->{$parent_snapname})) {
56 $base_snapshots->{$volid} = $parent_snapname;
57 } elsif ($last_sync == 0) {
58 my @desc_sorted_snap =
59 map { $_->[1] } sort { $b->[0] <=> $a->[0] }
60 map { [ ($_ =~ /__replicate_\Q$jobid\E_(\d+)_/)[0] || 0, $_ ] }
61 keys %{$remote_snapshots->{$volid}};
62
63 foreach my $remote_snap (@desc_sorted_snap) {
64 if (defined($last_snapshots->{$volid}->{$remote_snap})) {
65 $base_snapshots->{$volid} = $remote_snap;
66 last;
67 }
68 }
69 die "No common base to restore the job state\n".
70 "please delete jobid: $jobid and create the job again\n"
71 if !defined($base_snapshots->{$volid});
72 }
73 }
74 }
75
76 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
77 }
78
79 sub remote_prepare_local_job {
80 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
81
82 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
83 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
84 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
85 push @$cmd, @$volumes if scalar(@$volumes);
86
87 push @$cmd, '--last_sync', $last_sync;
88 push @$cmd, '--parent_snapname', $parent_snapname
89 if $parent_snapname;
90 push @$cmd, '--force' if $force;
91
92 my $remote_snapshots;
93
94 my $parser = sub {
95 my $line = shift;
96 $remote_snapshots = JSON::decode_json($line);
97 };
98
99 my $logger = sub {
100 my $line = shift;
101 chomp $line;
102 $logfunc->("(remote_prepare_local_job) $line");
103 };
104
105 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
106
107 die "prepare remote node failed - no result\n"
108 if !defined($remote_snapshots);
109
110 return $remote_snapshots;
111 }
112
113 sub remote_finalize_local_job {
114 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
115
116 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
117 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
118 @$volumes, '--last_sync', $last_sync];
119
120 my $logger = sub {
121 my $line = shift;
122 chomp $line;
123 $logfunc->("(remote_finalize_local_job) $line");
124 };
125
126 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
127 }
128
129 # finds local replication snapshots from $last_sync
130 # and removes all replication snapshots with other time stamps
131 sub prepare {
132 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
133
134 $last_sync //= 0;
135
136 my ($prefix, $snapname);
137
138 if (defined($jobid)) {
139 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
140 } else {
141 $prefix = '__replicate_';
142 }
143
144 my $last_snapshots = {};
145 my $cleaned_replicated_volumes = {};
146 foreach my $volid (@$volids) {
147 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
148 foreach my $snap (@$list) {
149 if ((defined($snapname) && ($snap eq $snapname)) ||
150 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
151 $last_snapshots->{$volid}->{$snap} = 1;
152 } elsif ($snap =~ m/^\Q$prefix\E/) {
153 if ($last_sync != 0) {
154 $logfunc->("delete stale replication snapshot '$snap' on $volid");
155 eval {
156 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
157 $cleaned_replicated_volumes->{$volid} = 1;
158 };
159
160 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
161 # The likelihood that the delete has worked out is high at a timeout.
162 # If it really fails, it will try to remove on the next run.
163 if (my $err = $@) {
164 # warn is for syslog/journal.
165 warn $err;
166
167 # logfunc will written in replication log.
168 $logfunc->("delete stale replication snapshot error: $err");
169 }
170 # Last_sync=0 and a replication snapshot only occur, if the VM was stolen
171 } else {
172 $last_snapshots->{$volid}->{$snap} = 1;
173 }
174 }
175 }
176 }
177
178 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
179 }
180
181 sub replicate_volume {
182 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
183
184 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
185
186 my $ratelimit_bps = int(1000000*$rate) if $rate;
187 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
188 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
189 }
190
191
192 sub replicate {
193 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
194
195 my $local_node = PVE::INotify::nodename();
196
197 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
198
199 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
200
201 my $migration_network;
202 my $migration_type = 'secure';
203 if (my $mc = $dc_conf->{migration}) {
204 $migration_network = $mc->{network};
205 $migration_type = $mc->{type} if defined($mc->{type});
206 }
207
208 my $jobid = $jobcfg->{id};
209 my $storecfg = PVE::Storage::config();
210 my $last_sync = $state->{last_sync};
211
212 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
213 if $start_time <= $last_sync;
214
215 my $vmid = $jobcfg->{guest};
216
217 my $conf = $guest_class->load_config($vmid);
218 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
219 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
220
221 my $sorted_volids = [ sort keys %$volumes ];
222
223 $running //= 0; # to avoid undef warnings from logfunc
224
225 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
226
227 $logfunc->("guest => $guest_name, running => $running");
228 $logfunc->("volumes => " . join(',', @$sorted_volids));
229
230 if (my $remove_job = $jobcfg->{remove_job}) {
231
232 $logfunc->("start job removal - mode '${remove_job}'");
233
234 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
235 # remove all remote volumes
236 my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
237
238 my %hash = map { $_ => 1 } @store_list;
239
240 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
241 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
242
243 }
244 # remove all local replication snapshots (lastsync => 0)
245 prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
246
247 PVE::ReplicationConfig::delete_job($jobid); # update config
248 $logfunc->("job removed");
249
250 return undef;
251 }
252
253 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
254
255 my $parent_snapname = $conf->{parent};
256
257 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
258 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
259
260 my $storeid_hash = {};
261 foreach my $volid (@$sorted_volids) {
262 my ($storeid) = PVE::Storage::parse_volume_id($volid);
263 $storeid_hash->{$storeid} = 1;
264 }
265 $state->{storeid_list} = [ sort keys %$storeid_hash ];
266
267 # freeze filesystem for data consistency
268 if ($freezefs) {
269 $logfunc->("freeze guest filesystem");
270 $guest_class->__snapshot_freeze($vmid, 0);
271 }
272
273 # make snapshot of all volumes
274 my $sync_snapname =
275 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
276
277 my $replicate_snapshots = {};
278 eval {
279 foreach my $volid (@$sorted_volids) {
280 $logfunc->("create snapshot '${sync_snapname}' on $volid");
281 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
282 $replicate_snapshots->{$volid} = 1;
283 }
284 };
285 my $err = $@;
286
287 # thaw immediately
288 if ($freezefs) {
289 $logfunc->("thaw guest filesystem");
290 $guest_class->__snapshot_freeze($vmid, 1);
291 }
292
293 my $cleanup_local_snapshots = sub {
294 my ($volid_hash, $snapname) = @_;
295 foreach my $volid (sort keys %$volid_hash) {
296 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
297 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
298 warn $@ if $@;
299 }
300 };
301
302 if ($err) {
303 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
304 die $err;
305 }
306
307 eval {
308
309 my $rate = $jobcfg->{rate};
310 my $insecure = $migration_type eq 'insecure';
311
312 foreach my $volid (@$sorted_volids) {
313 my $base_snapname;
314
315 if (defined($base_snapname = $base_snapshots->{$volid})) {
316 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
317 } else {
318 $logfunc->("full sync '$volid' ($sync_snapname)");
319 }
320
321 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
322 }
323 };
324
325 if ($err = $@) {
326 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
327 # we do not cleanup the remote side here - this is done in
328 # next run of prepare_local_job
329 die $err;
330 }
331
332 # remove old snapshots because they are no longer needed
333 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
334
335 eval {
336 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
337 };
338
339 # old snapshots will removed by next run from prepare_local_job.
340 if ($err = $@) {
341 # warn is for syslog/journal.
342 warn $err;
343
344 # logfunc will written in replication log.
345 $logfunc->("delete stale replication snapshot error: $err");
346 }
347
348 return $volumes;
349 }
350
351 my $run_replication_nolock = sub {
352 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
353
354 my $jobid = $jobcfg->{id};
355
356 my $volumes;
357
358 # we normaly write errors into the state file,
359 # but we also catch unexpected errors and log them to syslog
360 # (for examply when there are problems writing the state file)
361
362 my $state = PVE::ReplicationState::read_job_state($jobcfg);
363
364 PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
365
366 my $t0 = [gettimeofday];
367
368 mkdir $PVE::ReplicationState::replicate_logdir;
369 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
370 open(my $logfd, '>', $logfile) ||
371 die "unable to open replication log '$logfile' - $!\n";
372
373 my $logfunc_wrapper = sub {
374 my ($msg) = @_;
375
376 my $ctime = get_log_time();
377 print $logfd "$ctime $jobid: $msg\n";
378 if ($logfunc) {
379 if ($verbose) {
380 $logfunc->("$ctime $jobid: $msg");
381 } else {
382 $logfunc->($msg);
383 }
384 }
385 };
386
387 $logfunc_wrapper->("start replication job");
388
389 eval {
390 $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
391 };
392 my $err = $@;
393
394 if ($err) {
395 my $msg = "end replication job with error: $err";
396 chomp $msg;
397 $logfunc_wrapper->($msg);
398 } else {
399 $logfunc_wrapper->("end replication job");
400 }
401
402 PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
403
404 close($logfd);
405
406 die $err if $err;
407
408 return $volumes;
409 };
410
411 sub run_replication {
412 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
413
414 my $volumes;
415
416 my $timeout = 2; # do not wait too long - we repeat periodically anyways
417 $volumes = PVE::GuestHelpers::guest_migration_lock(
418 $jobcfg->{guest}, $timeout, $run_replication_nolock,
419 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
420
421 return $volumes;
422 }
423
424 1;