]> git.proxmox.com Git - pve-guest-common.git/blame - src/PVE/Replication.pm
replication: pass guest config to find_common_replication_snapshot
[pve-guest-common.git] / src / PVE / Replication.pm
CommitLineData
a6538c1e
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
93c3695b 8use POSIX qw(strftime);
a6538c1e
DM
9
10use PVE::INotify;
11use PVE::ProcFSTools;
12use PVE::Tools;
13use PVE::Cluster;
96c08a9d 14use PVE::DataCenterConfig;
a6538c1e
DM
15use PVE::Storage;
16use PVE::GuestHelpers;
17use PVE::ReplicationConfig;
18use PVE::ReplicationState;
0c85474f 19use PVE::SSHInfo;
a6538c1e
DM
20
21
22# regression tests should overwrite this
23sub get_log_time {
24
93c3695b 25 return strftime("%F %H:%M:%S", localtime);
a6538c1e
DM
26}
27
e4f63016
DM
28# Find common base replication snapshot, available on local and remote side.
29# Note: this also removes stale replication snapshots
30sub find_common_replication_snapshot {
c05dc937
FE
31 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $guest_conf, $logfunc) = @_;
32
33 my $parent_snapname = $guest_conf->{parent};
e4f63016
DM
34
35 my $last_sync_snapname =
36 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
37
38 # test if we have a replication_ snapshot from last sync
39 # and remove all other/stale replication snapshots
40
fbbeb872
FE
41 my $last_snapshots =
42 prepare($storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
e4f63016
DM
43
44 # prepare remote side
45 my $remote_snapshots = remote_prepare_local_job(
fbbeb872
FE
46 $ssh_info,
47 $jobid,
48 $vmid,
49 $volumes,
50 $storeid_list,
51 $last_sync,
52 $parent_snapname,
53 0,
54 $logfunc,
55 );
e4f63016
DM
56
57 my $base_snapshots = {};
58
59 foreach my $volid (@$volumes) {
e4f63016
DM
60 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
61 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
62 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
63 $base_snapshots->{$volid} = $last_sync_snapname;
64 } elsif (defined($parent_snapname) &&
65 ($last_snapshots->{$volid}->{$parent_snapname} &&
66 $remote_snapshots->{$volid}->{$parent_snapname})) {
67 $base_snapshots->{$volid} = $parent_snapname;
d5b277dc
WL
68 } elsif ($last_sync == 0) {
69 my @desc_sorted_snap =
70 map { $_->[1] } sort { $b->[0] <=> $a->[0] }
71 map { [ ($_ =~ /__replicate_\Q$jobid\E_(\d+)_/)[0] || 0, $_ ] }
72 keys %{$remote_snapshots->{$volid}};
73
74 foreach my $remote_snap (@desc_sorted_snap) {
75 if (defined($last_snapshots->{$volid}->{$remote_snap})) {
76 $base_snapshots->{$volid} = $remote_snap;
77 last;
78 }
79 }
80 die "No common base to restore the job state\n".
81 "please delete jobid: $jobid and create the job again\n"
82 if !defined($base_snapshots->{$volid});
e4f63016
DM
83 }
84 }
85 }
86
87 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
88}
89
a6538c1e
DM
90sub remote_prepare_local_job {
91 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
92
0c85474f 93 my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
a6538c1e
DM
94 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
95 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
96 push @$cmd, @$volumes if scalar(@$volumes);
97
98 push @$cmd, '--last_sync', $last_sync;
99 push @$cmd, '--parent_snapname', $parent_snapname
100 if $parent_snapname;
101 push @$cmd, '--force' if $force;
102
103 my $remote_snapshots;
104
105 my $parser = sub {
106 my $line = shift;
107 $remote_snapshots = JSON::decode_json($line);
108 };
109
110 my $logger = sub {
111 my $line = shift;
112 chomp $line;
113 $logfunc->("(remote_prepare_local_job) $line");
114 };
115
116 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
117
118 die "prepare remote node failed - no result\n"
119 if !defined($remote_snapshots);
120
121 return $remote_snapshots;
122}
123
124sub remote_finalize_local_job {
125 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
126
0c85474f 127 my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
a6538c1e
DM
128 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
129 @$volumes, '--last_sync', $last_sync];
130
131 my $logger = sub {
132 my $line = shift;
133 chomp $line;
134 $logfunc->("(remote_finalize_local_job) $line");
135 };
136
137 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
138}
139
140# finds local replication snapshots from $last_sync
141# and removes all replication snapshots with other time stamps
142sub prepare {
143 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
144
145 $last_sync //= 0;
146
b499eccb
DM
147 my ($prefix, $snapname);
148
149 if (defined($jobid)) {
150 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
151 } else {
152 $prefix = '__replicate_';
153 }
a6538c1e
DM
154
155 my $last_snapshots = {};
156 my $cleaned_replicated_volumes = {};
157 foreach my $volid (@$volids) {
158 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
159 foreach my $snap (@$list) {
b499eccb
DM
160 if ((defined($snapname) && ($snap eq $snapname)) ||
161 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
a6538c1e
DM
162 $last_snapshots->{$volid}->{$snap} = 1;
163 } elsif ($snap =~ m/^\Q$prefix\E/) {
a1dfeff3
WL
164 if ($last_sync != 0) {
165 $logfunc->("delete stale replication snapshot '$snap' on $volid");
166 eval {
167 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
168 $cleaned_replicated_volumes->{$volid} = 1;
169 };
170
171 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
172 # The likelihood that the delete has worked out is high at a timeout.
173 # If it really fails, it will try to remove on the next run.
174 if (my $err = $@) {
175 # warn is for syslog/journal.
176 warn $err;
177
178 # logfunc will written in replication log.
179 $logfunc->("delete stale replication snapshot error: $err");
180 }
181 # Last_sync=0 and a replication snapshot only occur, if the VM was stolen
182 } else {
183 $last_snapshots->{$volid}->{$snap} = 1;
edd61f2b 184 }
a6538c1e
DM
185 }
186 }
187 }
188
189 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
190}
191
192sub replicate_volume {
aa0d516f 193 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
a6538c1e
DM
194
195 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
196
45b4c97f
TL
197 my $ratelimit_bps = $rate ? int(1000000 * $rate) : undef;
198
36cff886
FE
199 my $opts = {
200 'target_volname' => $volname,
201 'base_snapshot' => $base_snapshot,
202 'snapshot' => $sync_snapname,
203 'ratelimit_bps' => $ratelimit_bps,
204 'insecure' => $insecure,
205 'with_snapshots' => 1,
206 };
207
208 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $opts, $logfunc);
a6538c1e
DM
209}
210
211
212sub replicate {
213 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
214
215 my $local_node = PVE::INotify::nodename();
216
217 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
218
219 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
220
221 my $migration_network;
222 my $migration_type = 'secure';
223 if (my $mc = $dc_conf->{migration}) {
224 $migration_network = $mc->{network};
225 $migration_type = $mc->{type} if defined($mc->{type});
226 }
227
228 my $jobid = $jobcfg->{id};
229 my $storecfg = PVE::Storage::config();
230 my $last_sync = $state->{last_sync};
231
232 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
233 if $start_time <= $last_sync;
234
235 my $vmid = $jobcfg->{guest};
a6538c1e
DM
236
237 my $conf = $guest_class->load_config($vmid);
238 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
c324e907 239 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
a6538c1e
DM
240
241 my $sorted_volids = [ sort keys %$volumes ];
242
243 $running //= 0; # to avoid undef warnings from logfunc
244
6358ffe1
DM
245 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
246
247 $logfunc->("guest => $guest_name, running => $running");
a6538c1e
DM
248 $logfunc->("volumes => " . join(',', @$sorted_volids));
249
250 if (my $remove_job = $jobcfg->{remove_job}) {
251
252 $logfunc->("start job removal - mode '${remove_job}'");
253
254 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
255 # remove all remote volumes
d869a19c
WL
256 my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
257
258 my %hash = map { $_ => 1 } @store_list;
259
0c85474f 260 my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target});
d869a19c 261 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
a6538c1e
DM
262
263 }
264 # remove all local replication snapshots (lastsync => 0)
a1dfeff3 265 prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
a6538c1e
DM
266
267 PVE::ReplicationConfig::delete_job($jobid); # update config
268 $logfunc->("job removed");
269
5899ebbd 270 return undef;
a6538c1e
DM
271 }
272
0c85474f 273 my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target}, $migration_network);
a6538c1e 274
e4f63016 275 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
c05dc937 276 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $conf, $logfunc);
a6538c1e
DM
277
278 my $storeid_hash = {};
279 foreach my $volid (@$sorted_volids) {
280 my ($storeid) = PVE::Storage::parse_volume_id($volid);
281 $storeid_hash->{$storeid} = 1;
282 }
283 $state->{storeid_list} = [ sort keys %$storeid_hash ];
284
285 # freeze filesystem for data consistency
286 if ($freezefs) {
287 $logfunc->("freeze guest filesystem");
288 $guest_class->__snapshot_freeze($vmid, 0);
289 }
290
291 # make snapshot of all volumes
e4f63016
DM
292 my $sync_snapname =
293 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
294
a6538c1e
DM
295 my $replicate_snapshots = {};
296 eval {
297 foreach my $volid (@$sorted_volids) {
298 $logfunc->("create snapshot '${sync_snapname}' on $volid");
299 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
300 $replicate_snapshots->{$volid} = 1;
301 }
302 };
303 my $err = $@;
304
5e93f430 305 # thaw immediately
a6538c1e 306 if ($freezefs) {
5e93f430 307 $logfunc->("thaw guest filesystem");
a6538c1e
DM
308 $guest_class->__snapshot_freeze($vmid, 1);
309 }
310
311 my $cleanup_local_snapshots = sub {
312 my ($volid_hash, $snapname) = @_;
313 foreach my $volid (sort keys %$volid_hash) {
314 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
315 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
316 warn $@ if $@;
317 }
318 };
319
320 if ($err) {
321 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
322 die $err;
323 }
324
325 eval {
326
327 my $rate = $jobcfg->{rate};
328 my $insecure = $migration_type eq 'insecure';
329
e90f586a
TL
330 $logfunc->("using $migration_type transmission, rate limit: "
331 . ($rate ? "$rate MByte/s" : "none"));
332
a6538c1e
DM
333 foreach my $volid (@$sorted_volids) {
334 my $base_snapname;
335
e4f63016
DM
336 if (defined($base_snapname = $base_snapshots->{$volid})) {
337 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
338 } else {
339 $logfunc->("full sync '$volid' ($sync_snapname)");
a6538c1e
DM
340 }
341
aa0d516f 342 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
a6538c1e
DM
343 }
344 };
a6538c1e 345
edd61f2b 346 if ($err = $@) {
a6538c1e
DM
347 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
348 # we do not cleanup the remote side here - this is done in
349 # next run of prepare_local_job
350 die $err;
351 }
352
353 # remove old snapshots because they are no longer needed
354 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
355
ce22af08
WL
356 eval {
357 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
358 };
a6538c1e 359
ce22af08
WL
360 # old snapshots will removed by next run from prepare_local_job.
361 if ($err = $@) {
362 # warn is for syslog/journal.
363 warn $err;
364
365 # logfunc will written in replication log.
c1797f7a 366 $logfunc->("delete stale replication snapshot error: $err");
ce22af08 367 }
5899ebbd
DM
368
369 return $volumes;
a6538c1e
DM
370}
371
372my $run_replication_nolock = sub {
ac02a68e 373 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e
DM
374
375 my $jobid = $jobcfg->{id};
376
5899ebbd
DM
377 my $volumes;
378
ab44df53 379 # we normally write errors into the state file,
a6538c1e
DM
380 # but we also catch unexpected errors and log them to syslog
381 # (for examply when there are problems writing the state file)
a6538c1e 382
ac02a68e
WL
383 my $state = PVE::ReplicationState::read_job_state($jobcfg);
384
385 PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
a6538c1e 386
ac02a68e 387 my $t0 = [gettimeofday];
a6538c1e 388
ac02a68e
WL
389 mkdir $PVE::ReplicationState::replicate_logdir;
390 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
391 open(my $logfd, '>', $logfile) ||
392 die "unable to open replication log '$logfile' - $!\n";
a6538c1e 393
ac02a68e
WL
394 my $logfunc_wrapper = sub {
395 my ($msg) = @_;
a6538c1e 396
ac02a68e
WL
397 my $ctime = get_log_time();
398 print $logfd "$ctime $jobid: $msg\n";
399 if ($logfunc) {
400 if ($verbose) {
401 $logfunc->("$ctime $jobid: $msg");
402 } else {
403 $logfunc->($msg);
3ec43aaf 404 }
ac02a68e
WL
405 }
406 };
a6538c1e 407
ac02a68e 408 $logfunc_wrapper->("start replication job");
a6538c1e 409
ac02a68e
WL
410 eval {
411 $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
412 };
413 my $err = $@;
a6538c1e 414
ac02a68e
WL
415 if ($err) {
416 my $msg = "end replication job with error: $err";
417 chomp $msg;
418 $logfunc_wrapper->($msg);
419 } else {
420 $logfunc_wrapper->("end replication job");
421 }
a6538c1e 422
ac02a68e 423 PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
c17dcb3e 424
ac02a68e 425 close($logfd);
1b82f171 426
ac02a68e 427 die $err if $err;
5899ebbd
DM
428
429 return $volumes;
a6538c1e
DM
430};
431
432sub run_replication {
ac02a68e 433 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e 434
5899ebbd
DM
435 my $volumes;
436
ac02a68e
WL
437 my $timeout = 2; # do not wait too long - we repeat periodically anyways
438 $volumes = PVE::GuestHelpers::guest_migration_lock(
439 $jobcfg->{guest}, $timeout, $run_replication_nolock,
440 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
441
5899ebbd 442 return $volumes;
a6538c1e
DM
443}
444
45c0b755
FE
445sub is_replication_snapshot {
446 my ($snapshot_name) = @_;
447
448 return $snapshot_name =~ m/^__replicate_/ ? 1 : 0;
449}
450
a6538c1e 4511;