]> git.proxmox.com Git - pve-guest-common.git/blame - PVE/Replication.pm
use PVE::DataCenterConfig
[pve-guest-common.git] / PVE / Replication.pm
CommitLineData
a6538c1e
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
93c3695b 8use POSIX qw(strftime);
a6538c1e
DM
9
10use PVE::INotify;
11use PVE::ProcFSTools;
12use PVE::Tools;
13use PVE::Cluster;
96c08a9d 14use PVE::DataCenterConfig;
a6538c1e
DM
15use PVE::Storage;
16use PVE::GuestHelpers;
17use PVE::ReplicationConfig;
18use PVE::ReplicationState;
19
20
21# regression tests should overwrite this
22sub get_log_time {
23
93c3695b 24 return strftime("%F %H:%M:%S", localtime);
a6538c1e
DM
25}
26
e4f63016
DM
27# Find common base replication snapshot, available on local and remote side.
28# Note: this also removes stale replication snapshots
29sub find_common_replication_snapshot {
30 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
31
32 my $last_sync_snapname =
33 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
34
35 # test if we have a replication_ snapshot from last sync
36 # and remove all other/stale replication snapshots
37
38 my $last_snapshots = prepare(
39 $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
40
41 # prepare remote side
42 my $remote_snapshots = remote_prepare_local_job(
43 $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
44
45 my $base_snapshots = {};
46
47 foreach my $volid (@$volumes) {
48 my $base_snapname;
49
50 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
51 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
52 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
53 $base_snapshots->{$volid} = $last_sync_snapname;
54 } elsif (defined($parent_snapname) &&
55 ($last_snapshots->{$volid}->{$parent_snapname} &&
56 $remote_snapshots->{$volid}->{$parent_snapname})) {
57 $base_snapshots->{$volid} = $parent_snapname;
d5b277dc
WL
58 } elsif ($last_sync == 0) {
59 my @desc_sorted_snap =
60 map { $_->[1] } sort { $b->[0] <=> $a->[0] }
61 map { [ ($_ =~ /__replicate_\Q$jobid\E_(\d+)_/)[0] || 0, $_ ] }
62 keys %{$remote_snapshots->{$volid}};
63
64 foreach my $remote_snap (@desc_sorted_snap) {
65 if (defined($last_snapshots->{$volid}->{$remote_snap})) {
66 $base_snapshots->{$volid} = $remote_snap;
67 last;
68 }
69 }
70 die "No common base to restore the job state\n".
71 "please delete jobid: $jobid and create the job again\n"
72 if !defined($base_snapshots->{$volid});
e4f63016
DM
73 }
74 }
75 }
76
77 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
78}
79
a6538c1e
DM
80sub remote_prepare_local_job {
81 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
82
83 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
84 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
85 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
86 push @$cmd, @$volumes if scalar(@$volumes);
87
88 push @$cmd, '--last_sync', $last_sync;
89 push @$cmd, '--parent_snapname', $parent_snapname
90 if $parent_snapname;
91 push @$cmd, '--force' if $force;
92
93 my $remote_snapshots;
94
95 my $parser = sub {
96 my $line = shift;
97 $remote_snapshots = JSON::decode_json($line);
98 };
99
100 my $logger = sub {
101 my $line = shift;
102 chomp $line;
103 $logfunc->("(remote_prepare_local_job) $line");
104 };
105
106 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
107
108 die "prepare remote node failed - no result\n"
109 if !defined($remote_snapshots);
110
111 return $remote_snapshots;
112}
113
114sub remote_finalize_local_job {
115 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
116
117 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
118 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
119 @$volumes, '--last_sync', $last_sync];
120
121 my $logger = sub {
122 my $line = shift;
123 chomp $line;
124 $logfunc->("(remote_finalize_local_job) $line");
125 };
126
127 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
128}
129
130# finds local replication snapshots from $last_sync
131# and removes all replication snapshots with other time stamps
132sub prepare {
133 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
134
135 $last_sync //= 0;
136
b499eccb
DM
137 my ($prefix, $snapname);
138
139 if (defined($jobid)) {
140 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
141 } else {
142 $prefix = '__replicate_';
143 }
a6538c1e
DM
144
145 my $last_snapshots = {};
146 my $cleaned_replicated_volumes = {};
147 foreach my $volid (@$volids) {
148 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
149 foreach my $snap (@$list) {
b499eccb
DM
150 if ((defined($snapname) && ($snap eq $snapname)) ||
151 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
a6538c1e
DM
152 $last_snapshots->{$volid}->{$snap} = 1;
153 } elsif ($snap =~ m/^\Q$prefix\E/) {
a1dfeff3
WL
154 if ($last_sync != 0) {
155 $logfunc->("delete stale replication snapshot '$snap' on $volid");
156 eval {
157 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
158 $cleaned_replicated_volumes->{$volid} = 1;
159 };
160
161 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
162 # The likelihood that the delete has worked out is high at a timeout.
163 # If it really fails, it will try to remove on the next run.
164 if (my $err = $@) {
165 # warn is for syslog/journal.
166 warn $err;
167
168 # logfunc will written in replication log.
169 $logfunc->("delete stale replication snapshot error: $err");
170 }
171 # Last_sync=0 and a replication snapshot only occur, if the VM was stolen
172 } else {
173 $last_snapshots->{$volid}->{$snap} = 1;
edd61f2b 174 }
a6538c1e
DM
175 }
176 }
177 }
178
179 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
180}
181
182sub replicate_volume {
aa0d516f 183 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
a6538c1e
DM
184
185 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
186
c475e16d 187 my $ratelimit_bps = int(1000000*$rate) if $rate;
a6538c1e 188 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
aa0d516f 189 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
a6538c1e
DM
190}
191
192
193sub replicate {
194 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
195
196 my $local_node = PVE::INotify::nodename();
197
198 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
199
200 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
201
202 my $migration_network;
203 my $migration_type = 'secure';
204 if (my $mc = $dc_conf->{migration}) {
205 $migration_network = $mc->{network};
206 $migration_type = $mc->{type} if defined($mc->{type});
207 }
208
209 my $jobid = $jobcfg->{id};
210 my $storecfg = PVE::Storage::config();
211 my $last_sync = $state->{last_sync};
212
213 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
214 if $start_time <= $last_sync;
215
216 my $vmid = $jobcfg->{guest};
a6538c1e
DM
217
218 my $conf = $guest_class->load_config($vmid);
219 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
c324e907 220 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
a6538c1e
DM
221
222 my $sorted_volids = [ sort keys %$volumes ];
223
224 $running //= 0; # to avoid undef warnings from logfunc
225
6358ffe1
DM
226 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
227
228 $logfunc->("guest => $guest_name, running => $running");
a6538c1e
DM
229 $logfunc->("volumes => " . join(',', @$sorted_volids));
230
231 if (my $remove_job = $jobcfg->{remove_job}) {
232
233 $logfunc->("start job removal - mode '${remove_job}'");
234
235 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
236 # remove all remote volumes
d869a19c
WL
237 my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
238
239 my %hash = map { $_ => 1 } @store_list;
240
a6538c1e 241 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
d869a19c 242 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
a6538c1e
DM
243
244 }
245 # remove all local replication snapshots (lastsync => 0)
a1dfeff3 246 prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
a6538c1e
DM
247
248 PVE::ReplicationConfig::delete_job($jobid); # update config
249 $logfunc->("job removed");
250
5899ebbd 251 return undef;
a6538c1e
DM
252 }
253
254 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
255
a6538c1e
DM
256 my $parent_snapname = $conf->{parent};
257
e4f63016
DM
258 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
259 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
a6538c1e
DM
260
261 my $storeid_hash = {};
262 foreach my $volid (@$sorted_volids) {
263 my ($storeid) = PVE::Storage::parse_volume_id($volid);
264 $storeid_hash->{$storeid} = 1;
265 }
266 $state->{storeid_list} = [ sort keys %$storeid_hash ];
267
268 # freeze filesystem for data consistency
269 if ($freezefs) {
270 $logfunc->("freeze guest filesystem");
271 $guest_class->__snapshot_freeze($vmid, 0);
272 }
273
274 # make snapshot of all volumes
e4f63016
DM
275 my $sync_snapname =
276 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
277
a6538c1e
DM
278 my $replicate_snapshots = {};
279 eval {
280 foreach my $volid (@$sorted_volids) {
281 $logfunc->("create snapshot '${sync_snapname}' on $volid");
282 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
283 $replicate_snapshots->{$volid} = 1;
284 }
285 };
286 my $err = $@;
287
5e93f430 288 # thaw immediately
a6538c1e 289 if ($freezefs) {
5e93f430 290 $logfunc->("thaw guest filesystem");
a6538c1e
DM
291 $guest_class->__snapshot_freeze($vmid, 1);
292 }
293
294 my $cleanup_local_snapshots = sub {
295 my ($volid_hash, $snapname) = @_;
296 foreach my $volid (sort keys %$volid_hash) {
297 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
298 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
299 warn $@ if $@;
300 }
301 };
302
303 if ($err) {
304 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
305 die $err;
306 }
307
308 eval {
309
310 my $rate = $jobcfg->{rate};
311 my $insecure = $migration_type eq 'insecure';
312
313 foreach my $volid (@$sorted_volids) {
314 my $base_snapname;
315
e4f63016
DM
316 if (defined($base_snapname = $base_snapshots->{$volid})) {
317 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
318 } else {
319 $logfunc->("full sync '$volid' ($sync_snapname)");
a6538c1e
DM
320 }
321
aa0d516f 322 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
a6538c1e
DM
323 }
324 };
a6538c1e 325
edd61f2b 326 if ($err = $@) {
a6538c1e
DM
327 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
328 # we do not cleanup the remote side here - this is done in
329 # next run of prepare_local_job
330 die $err;
331 }
332
333 # remove old snapshots because they are no longer needed
334 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
335
ce22af08
WL
336 eval {
337 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
338 };
a6538c1e 339
ce22af08
WL
340 # old snapshots will removed by next run from prepare_local_job.
341 if ($err = $@) {
342 # warn is for syslog/journal.
343 warn $err;
344
345 # logfunc will written in replication log.
c1797f7a 346 $logfunc->("delete stale replication snapshot error: $err");
ce22af08 347 }
5899ebbd
DM
348
349 return $volumes;
a6538c1e
DM
350}
351
352my $run_replication_nolock = sub {
ac02a68e 353 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e
DM
354
355 my $jobid = $jobcfg->{id};
356
5899ebbd
DM
357 my $volumes;
358
a6538c1e
DM
359 # we normaly write errors into the state file,
360 # but we also catch unexpected errors and log them to syslog
361 # (for examply when there are problems writing the state file)
a6538c1e 362
ac02a68e
WL
363 my $state = PVE::ReplicationState::read_job_state($jobcfg);
364
365 PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
a6538c1e 366
ac02a68e 367 my $t0 = [gettimeofday];
a6538c1e 368
ac02a68e
WL
369 mkdir $PVE::ReplicationState::replicate_logdir;
370 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
371 open(my $logfd, '>', $logfile) ||
372 die "unable to open replication log '$logfile' - $!\n";
a6538c1e 373
ac02a68e
WL
374 my $logfunc_wrapper = sub {
375 my ($msg) = @_;
a6538c1e 376
ac02a68e
WL
377 my $ctime = get_log_time();
378 print $logfd "$ctime $jobid: $msg\n";
379 if ($logfunc) {
380 if ($verbose) {
381 $logfunc->("$ctime $jobid: $msg");
382 } else {
383 $logfunc->($msg);
3ec43aaf 384 }
ac02a68e
WL
385 }
386 };
a6538c1e 387
ac02a68e 388 $logfunc_wrapper->("start replication job");
a6538c1e 389
ac02a68e
WL
390 eval {
391 $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
392 };
393 my $err = $@;
a6538c1e 394
ac02a68e
WL
395 if ($err) {
396 my $msg = "end replication job with error: $err";
397 chomp $msg;
398 $logfunc_wrapper->($msg);
399 } else {
400 $logfunc_wrapper->("end replication job");
401 }
a6538c1e 402
ac02a68e 403 PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
c17dcb3e 404
ac02a68e 405 close($logfd);
1b82f171 406
ac02a68e 407 die $err if $err;
5899ebbd
DM
408
409 return $volumes;
a6538c1e
DM
410};
411
412sub run_replication {
ac02a68e 413 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e 414
5899ebbd
DM
415 my $volumes;
416
ac02a68e
WL
417 my $timeout = 2; # do not wait too long - we repeat periodically anyways
418 $volumes = PVE::GuestHelpers::guest_migration_lock(
419 $jobcfg->{guest}, $timeout, $run_replication_nolock,
420 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
421
5899ebbd 422 return $volumes;
a6538c1e
DM
423}
424
4251;