]> git.proxmox.com Git - pve-guest-common.git/blame - src/PVE/Replication.pm
replication: prepare: include volumes without snapshots in the result
[pve-guest-common.git] / src / PVE / Replication.pm
CommitLineData
a6538c1e
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
93c3695b 8use POSIX qw(strftime);
a6538c1e
DM
9
10use PVE::INotify;
11use PVE::ProcFSTools;
12use PVE::Tools;
13use PVE::Cluster;
96c08a9d 14use PVE::DataCenterConfig;
a6538c1e
DM
15use PVE::Storage;
16use PVE::GuestHelpers;
17use PVE::ReplicationConfig;
18use PVE::ReplicationState;
0c85474f 19use PVE::SSHInfo;
a6538c1e
DM
20
21
22# regression tests should overwrite this
23sub get_log_time {
24
93c3695b 25 return strftime("%F %H:%M:%S", localtime);
a6538c1e
DM
26}
27
e4f63016
DM
28# Find common base replication snapshot, available on local and remote side.
29# Note: this also removes stale replication snapshots
30sub find_common_replication_snapshot {
c05dc937
FE
31 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $guest_conf, $logfunc) = @_;
32
33 my $parent_snapname = $guest_conf->{parent};
b20bf9bf 34 my $conf_snapshots = $guest_conf->{snapshots};
e4f63016
DM
35
36 my $last_sync_snapname =
37 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
38
4c1bd502 39 my $local_snapshots =
fbbeb872 40 prepare($storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
e4f63016
DM
41
42 # prepare remote side
43 my $remote_snapshots = remote_prepare_local_job(
fbbeb872
FE
44 $ssh_info,
45 $jobid,
46 $vmid,
47 $volumes,
48 $storeid_list,
49 $last_sync,
50 $parent_snapname,
51 0,
52 $logfunc,
53 );
e4f63016
DM
54
55 my $base_snapshots = {};
56
57 foreach my $volid (@$volumes) {
4c1bd502 58 my $local_info = $local_snapshots->{$volid};
b20bf9bf
FE
59 my $remote_info = $remote_snapshots->{$volid};
60
61 if (defined($local_info) && defined($remote_info)) {
62 my $common_snapshot = sub {
63 my ($snap) = @_;
64
65 return 0 if !$local_info->{$snap} || !$remote_info->{$snap};
66
67 # Check for ID if remote side supports it already.
68 return $local_info->{$snap}->{id} eq $remote_info->{$snap}->{id}
69 if ref($remote_info->{$snap}) eq 'HASH';
70
71 return 1;
72 };
73
74 if ($common_snapshot->($last_sync_snapname)) {
e4f63016 75 $base_snapshots->{$volid} = $last_sync_snapname;
b20bf9bf 76 } elsif (defined($parent_snapname) && $common_snapshot->($parent_snapname)) {
e4f63016 77 $base_snapshots->{$volid} = $parent_snapname;
8d1cd443 78 } else {
84fc20aa 79 my $most_recent = [0, undef];
b20bf9bf
FE
80 for my $snapshot (keys $local_info->%*) {
81 next if !$common_snapshot->($snapshot);
82 next if !$conf_snapshots->{$snapshot} && !is_replication_snapshot($snapshot);
84fc20aa 83
b20bf9bf 84 my $timestamp = $local_info->{$snapshot}->{timestamp};
84fc20aa 85
b20bf9bf 86 $most_recent = [$timestamp, $snapshot] if $timestamp > $most_recent->[0];
84fc20aa
FE
87 }
88
89 if ($most_recent->[1]) {
90 $base_snapshots->{$volid} = $most_recent->[1];
91 next;
d5b277dc 92 }
8d1cd443 93
8d1cd443
FE
94 # The volume exists on the remote side, so trying a full sync won't work.
95 # Die early with a clean error.
d5b277dc
WL
96 die "No common base to restore the job state\n".
97 "please delete jobid: $jobid and create the job again\n"
98 if !defined($base_snapshots->{$volid});
e4f63016
DM
99 }
100 }
101 }
102
4c1bd502 103 return ($base_snapshots, $local_snapshots, $last_sync_snapname);
e4f63016
DM
104}
105
a6538c1e
DM
106sub remote_prepare_local_job {
107 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
108
0c85474f 109 my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
a6538c1e
DM
110 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
111 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
112 push @$cmd, @$volumes if scalar(@$volumes);
113
114 push @$cmd, '--last_sync', $last_sync;
115 push @$cmd, '--parent_snapname', $parent_snapname
116 if $parent_snapname;
117 push @$cmd, '--force' if $force;
118
119 my $remote_snapshots;
120
121 my $parser = sub {
122 my $line = shift;
123 $remote_snapshots = JSON::decode_json($line);
124 };
125
126 my $logger = sub {
127 my $line = shift;
128 chomp $line;
129 $logfunc->("(remote_prepare_local_job) $line");
130 };
131
132 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
133
134 die "prepare remote node failed - no result\n"
135 if !defined($remote_snapshots);
136
137 return $remote_snapshots;
138}
139
140sub remote_finalize_local_job {
141 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
142
0c85474f 143 my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
a6538c1e
DM
144 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
145 @$volumes, '--last_sync', $last_sync];
146
147 my $logger = sub {
148 my $line = shift;
149 chomp $line;
150 $logfunc->("(remote_finalize_local_job) $line");
151 };
152
153 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
154}
155
c0b29481
FE
156# Finds all local snapshots and removes replication snapshots not matching $last_sync after checking
157# that it is present. Use last_sync=0 (or undef) to prevent removal (useful if VM was stolen). Use
158# last_sync=1 to remove all replication snapshots (limited to job if specified).
a6538c1e
DM
159sub prepare {
160 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
161
162 $last_sync //= 0;
163
b499eccb
DM
164 my ($prefix, $snapname);
165
166 if (defined($jobid)) {
167 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
168 } else {
169 $prefix = '__replicate_';
170 }
a6538c1e 171
4c1bd502 172 my $local_snapshots = {};
a6538c1e
DM
173 my $cleaned_replicated_volumes = {};
174 foreach my $volid (@$volids) {
a6f5b358
FE
175 $local_snapshots->{$volid} = {};
176
3200c404 177 my $info = PVE::Storage::volume_snapshot_info($storecfg, $volid);
c0b29481
FE
178
179 my $removal_ok = !defined($snapname) || $info->{$snapname};
180 $removal_ok = 0 if $last_sync == 0; # last_sync=0 if the VM was stolen, don't remove!
181 $removal_ok = 1 if $last_sync == 1; # last_sync=1 is a special value used to remove all
18b6c8d1
FE
182
183 # check if it's a replication snapshot with the same $prefix but not the $last_sync one
184 my $potentially_stale = sub {
185 my ($snap) = @_;
186
187 return 0 if defined($snapname) && $snap eq $snapname;
188 return 0 if defined($parent_snapname) && $snap eq $parent_snapname;
189 return $snap =~ m/^\Q$prefix\E/;
190 };
191
c0b29481 192 $logfunc->("expected snapshot $snapname not present for $volid, not removing others")
18b6c8d1 193 if !$removal_ok && $last_sync > 1 && grep { $potentially_stale->($_) } keys $info->%*;
c0b29481 194
3200c404 195 for my $snap (keys $info->%*) {
18b6c8d1 196 if ($potentially_stale->($snap) && $removal_ok) {
244583a4
FE
197 $logfunc->("delete stale replication snapshot '$snap' on $volid");
198 eval {
199 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
200 $cleaned_replicated_volumes->{$volid} = 1;
201 };
202
203 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
204 # The likelihood that the delete has worked out is high at a timeout.
205 # If it really fails, it will try to remove on the next run.
206 if (my $err = $@) {
207 # warn is for syslog/journal.
208 warn $err;
209
210 # logfunc will written in replication log.
211 $logfunc->("delete stale replication snapshot error: $err");
edd61f2b 212 }
8d1cd443 213 } else {
4c1bd502 214 $local_snapshots->{$volid}->{$snap} = $info->{$snap};
a6538c1e
DM
215 }
216 }
217 }
218
4c1bd502 219 return wantarray ? ($local_snapshots, $cleaned_replicated_volumes) : $local_snapshots;
a6538c1e
DM
220}
221
222sub replicate_volume {
aa0d516f 223 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
a6538c1e
DM
224
225 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
226
45b4c97f
TL
227 my $ratelimit_bps = $rate ? int(1000000 * $rate) : undef;
228
36cff886
FE
229 my $opts = {
230 'target_volname' => $volname,
231 'base_snapshot' => $base_snapshot,
232 'snapshot' => $sync_snapname,
233 'ratelimit_bps' => $ratelimit_bps,
234 'insecure' => $insecure,
235 'with_snapshots' => 1,
236 };
237
238 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $opts, $logfunc);
a6538c1e
DM
239}
240
241
242sub replicate {
243 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
244
245 my $local_node = PVE::INotify::nodename();
246
247 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
248
249 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
250
251 my $migration_network;
252 my $migration_type = 'secure';
253 if (my $mc = $dc_conf->{migration}) {
254 $migration_network = $mc->{network};
255 $migration_type = $mc->{type} if defined($mc->{type});
256 }
257
258 my $jobid = $jobcfg->{id};
259 my $storecfg = PVE::Storage::config();
260 my $last_sync = $state->{last_sync};
261
262 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
263 if $start_time <= $last_sync;
264
265 my $vmid = $jobcfg->{guest};
a6538c1e
DM
266
267 my $conf = $guest_class->load_config($vmid);
268 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
c324e907 269 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
a6538c1e
DM
270
271 my $sorted_volids = [ sort keys %$volumes ];
272
273 $running //= 0; # to avoid undef warnings from logfunc
274
6358ffe1
DM
275 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
276
277 $logfunc->("guest => $guest_name, running => $running");
a6538c1e
DM
278 $logfunc->("volumes => " . join(',', @$sorted_volids));
279
01856537
FE
280 # filter out left-over non-existing/removed storages - avoids error on target
281 $state->{storeid_list} = [ grep { $storecfg->{ids}->{$_} } $state->{storeid_list}->@* ];
282
a6538c1e
DM
283 if (my $remove_job = $jobcfg->{remove_job}) {
284
285 $logfunc->("start job removal - mode '${remove_job}'");
286
287 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
288 # remove all remote volumes
d869a19c 289 my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
1e6416f1 290 push @store_list, @{$state->{storeid_list}};
d869a19c
WL
291
292 my %hash = map { $_ => 1 } @store_list;
293
0c85474f 294 my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target});
d869a19c 295 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
a6538c1e
DM
296
297 }
298 # remove all local replication snapshots (lastsync => 0)
a1dfeff3 299 prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
a6538c1e
DM
300
301 PVE::ReplicationConfig::delete_job($jobid); # update config
302 $logfunc->("job removed");
303
5899ebbd 304 return undef;
a6538c1e
DM
305 }
306
0c85474f 307 my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target}, $migration_network);
a6538c1e 308
4c1bd502 309 my ($base_snapshots, $local_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
c05dc937 310 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $conf, $logfunc);
a6538c1e
DM
311
312 my $storeid_hash = {};
313 foreach my $volid (@$sorted_volids) {
314 my ($storeid) = PVE::Storage::parse_volume_id($volid);
315 $storeid_hash->{$storeid} = 1;
316 }
317 $state->{storeid_list} = [ sort keys %$storeid_hash ];
318
319 # freeze filesystem for data consistency
320 if ($freezefs) {
321 $logfunc->("freeze guest filesystem");
322 $guest_class->__snapshot_freeze($vmid, 0);
323 }
324
325 # make snapshot of all volumes
e4f63016
DM
326 my $sync_snapname =
327 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
328
a6538c1e
DM
329 my $replicate_snapshots = {};
330 eval {
331 foreach my $volid (@$sorted_volids) {
332 $logfunc->("create snapshot '${sync_snapname}' on $volid");
333 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
334 $replicate_snapshots->{$volid} = 1;
335 }
336 };
337 my $err = $@;
338
5e93f430 339 # thaw immediately
a6538c1e 340 if ($freezefs) {
5e93f430 341 $logfunc->("thaw guest filesystem");
a6538c1e
DM
342 $guest_class->__snapshot_freeze($vmid, 1);
343 }
344
345 my $cleanup_local_snapshots = sub {
346 my ($volid_hash, $snapname) = @_;
347 foreach my $volid (sort keys %$volid_hash) {
348 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
349 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
350 warn $@ if $@;
351 }
352 };
353
354 if ($err) {
355 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
356 die $err;
357 }
358
359 eval {
360
361 my $rate = $jobcfg->{rate};
362 my $insecure = $migration_type eq 'insecure';
363
e90f586a
TL
364 $logfunc->("using $migration_type transmission, rate limit: "
365 . ($rate ? "$rate MByte/s" : "none"));
366
a6538c1e
DM
367 foreach my $volid (@$sorted_volids) {
368 my $base_snapname;
369
e4f63016
DM
370 if (defined($base_snapname = $base_snapshots->{$volid})) {
371 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
372 } else {
373 $logfunc->("full sync '$volid' ($sync_snapname)");
a6538c1e
DM
374 }
375
aa0d516f 376 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
a6538c1e
DM
377 }
378 };
a6538c1e 379
edd61f2b 380 if ($err = $@) {
a6538c1e
DM
381 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
382 # we do not cleanup the remote side here - this is done in
383 # next run of prepare_local_job
384 die $err;
385 }
386
ff574bf8
FE
387 # Ensure that new sync is recorded before removing old replication snapshots.
388 PVE::ReplicationState::record_sync_end($jobcfg, $state, $start_time);
389
a6538c1e 390 # remove old snapshots because they are no longer needed
4c1bd502 391 $cleanup_local_snapshots->($local_snapshots, $last_sync_snapname);
a6538c1e 392
ce22af08
WL
393 eval {
394 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
395 };
a6538c1e 396
ce22af08
WL
397 # old snapshots will removed by next run from prepare_local_job.
398 if ($err = $@) {
399 # warn is for syslog/journal.
400 warn $err;
401
402 # logfunc will written in replication log.
c1797f7a 403 $logfunc->("delete stale replication snapshot error: $err");
ce22af08 404 }
5899ebbd
DM
405
406 return $volumes;
a6538c1e
DM
407}
408
409my $run_replication_nolock = sub {
ac02a68e 410 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e
DM
411
412 my $jobid = $jobcfg->{id};
413
5899ebbd
DM
414 my $volumes;
415
ab44df53 416 # we normally write errors into the state file,
a6538c1e
DM
417 # but we also catch unexpected errors and log them to syslog
418 # (for examply when there are problems writing the state file)
a6538c1e 419
ac02a68e
WL
420 my $state = PVE::ReplicationState::read_job_state($jobcfg);
421
422 PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
a6538c1e 423
ac02a68e 424 my $t0 = [gettimeofday];
a6538c1e 425
ac02a68e
WL
426 mkdir $PVE::ReplicationState::replicate_logdir;
427 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
428 open(my $logfd, '>', $logfile) ||
429 die "unable to open replication log '$logfile' - $!\n";
a6538c1e 430
ac02a68e
WL
431 my $logfunc_wrapper = sub {
432 my ($msg) = @_;
a6538c1e 433
ac02a68e
WL
434 my $ctime = get_log_time();
435 print $logfd "$ctime $jobid: $msg\n";
436 if ($logfunc) {
437 if ($verbose) {
438 $logfunc->("$ctime $jobid: $msg");
439 } else {
440 $logfunc->($msg);
3ec43aaf 441 }
ac02a68e
WL
442 }
443 };
a6538c1e 444
ac02a68e 445 $logfunc_wrapper->("start replication job");
a6538c1e 446
ac02a68e
WL
447 eval {
448 $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
449 };
450 my $err = $@;
a6538c1e 451
ac02a68e
WL
452 if ($err) {
453 my $msg = "end replication job with error: $err";
454 chomp $msg;
455 $logfunc_wrapper->($msg);
456 } else {
457 $logfunc_wrapper->("end replication job");
458 }
a6538c1e 459
ac02a68e 460 PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
c17dcb3e 461
ac02a68e 462 close($logfd);
1b82f171 463
ac02a68e 464 die $err if $err;
5899ebbd
DM
465
466 return $volumes;
a6538c1e
DM
467};
468
469sub run_replication {
ac02a68e 470 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e 471
5899ebbd
DM
472 my $volumes;
473
ac02a68e
WL
474 my $timeout = 2; # do not wait too long - we repeat periodically anyways
475 $volumes = PVE::GuestHelpers::guest_migration_lock(
476 $jobcfg->{guest}, $timeout, $run_replication_nolock,
477 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
478
5899ebbd 479 return $volumes;
a6538c1e
DM
480}
481
45c0b755 482sub is_replication_snapshot {
602ca77c
FE
483 my ($snapshot_name, $jobid) = @_;
484
485 if (defined($jobid)) {
486 return $snapshot_name =~ m/^__replicate_\Q$jobid\E/ ? 1 : 0;
487 }
45c0b755
FE
488
489 return $snapshot_name =~ m/^__replicate_/ ? 1 : 0;
490}
491
a6538c1e 4921;