]> git.proxmox.com Git - pve-guest-common.git/blame - PVE/Replication.pm
Add foreach_unused_volume
[pve-guest-common.git] / PVE / Replication.pm
CommitLineData
a6538c1e
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
93c3695b 8use POSIX qw(strftime);
a6538c1e
DM
9
10use PVE::INotify;
11use PVE::ProcFSTools;
12use PVE::Tools;
13use PVE::Cluster;
96c08a9d 14use PVE::DataCenterConfig;
a6538c1e
DM
15use PVE::Storage;
16use PVE::GuestHelpers;
17use PVE::ReplicationConfig;
18use PVE::ReplicationState;
0c85474f 19use PVE::SSHInfo;
a6538c1e
DM
20
21
22# regression tests should overwrite this
23sub get_log_time {
24
93c3695b 25 return strftime("%F %H:%M:%S", localtime);
a6538c1e
DM
26}
27
e4f63016
DM
28# Find common base replication snapshot, available on local and remote side.
29# Note: this also removes stale replication snapshots
30sub find_common_replication_snapshot {
31 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
32
33 my $last_sync_snapname =
34 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
35
36 # test if we have a replication_ snapshot from last sync
37 # and remove all other/stale replication snapshots
38
39 my $last_snapshots = prepare(
40 $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
41
42 # prepare remote side
43 my $remote_snapshots = remote_prepare_local_job(
44 $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
45
46 my $base_snapshots = {};
47
48 foreach my $volid (@$volumes) {
49 my $base_snapname;
50
51 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
52 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
53 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
54 $base_snapshots->{$volid} = $last_sync_snapname;
55 } elsif (defined($parent_snapname) &&
56 ($last_snapshots->{$volid}->{$parent_snapname} &&
57 $remote_snapshots->{$volid}->{$parent_snapname})) {
58 $base_snapshots->{$volid} = $parent_snapname;
d5b277dc
WL
59 } elsif ($last_sync == 0) {
60 my @desc_sorted_snap =
61 map { $_->[1] } sort { $b->[0] <=> $a->[0] }
62 map { [ ($_ =~ /__replicate_\Q$jobid\E_(\d+)_/)[0] || 0, $_ ] }
63 keys %{$remote_snapshots->{$volid}};
64
65 foreach my $remote_snap (@desc_sorted_snap) {
66 if (defined($last_snapshots->{$volid}->{$remote_snap})) {
67 $base_snapshots->{$volid} = $remote_snap;
68 last;
69 }
70 }
71 die "No common base to restore the job state\n".
72 "please delete jobid: $jobid and create the job again\n"
73 if !defined($base_snapshots->{$volid});
e4f63016
DM
74 }
75 }
76 }
77
78 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
79}
80
a6538c1e
DM
81sub remote_prepare_local_job {
82 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
83
0c85474f 84 my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
a6538c1e
DM
85 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
86 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
87 push @$cmd, @$volumes if scalar(@$volumes);
88
89 push @$cmd, '--last_sync', $last_sync;
90 push @$cmd, '--parent_snapname', $parent_snapname
91 if $parent_snapname;
92 push @$cmd, '--force' if $force;
93
94 my $remote_snapshots;
95
96 my $parser = sub {
97 my $line = shift;
98 $remote_snapshots = JSON::decode_json($line);
99 };
100
101 my $logger = sub {
102 my $line = shift;
103 chomp $line;
104 $logfunc->("(remote_prepare_local_job) $line");
105 };
106
107 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
108
109 die "prepare remote node failed - no result\n"
110 if !defined($remote_snapshots);
111
112 return $remote_snapshots;
113}
114
115sub remote_finalize_local_job {
116 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
117
0c85474f 118 my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
a6538c1e
DM
119 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
120 @$volumes, '--last_sync', $last_sync];
121
122 my $logger = sub {
123 my $line = shift;
124 chomp $line;
125 $logfunc->("(remote_finalize_local_job) $line");
126 };
127
128 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
129}
130
131# finds local replication snapshots from $last_sync
132# and removes all replication snapshots with other time stamps
133sub prepare {
134 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
135
136 $last_sync //= 0;
137
b499eccb
DM
138 my ($prefix, $snapname);
139
140 if (defined($jobid)) {
141 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
142 } else {
143 $prefix = '__replicate_';
144 }
a6538c1e
DM
145
146 my $last_snapshots = {};
147 my $cleaned_replicated_volumes = {};
148 foreach my $volid (@$volids) {
149 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
150 foreach my $snap (@$list) {
b499eccb
DM
151 if ((defined($snapname) && ($snap eq $snapname)) ||
152 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
a6538c1e
DM
153 $last_snapshots->{$volid}->{$snap} = 1;
154 } elsif ($snap =~ m/^\Q$prefix\E/) {
a1dfeff3
WL
155 if ($last_sync != 0) {
156 $logfunc->("delete stale replication snapshot '$snap' on $volid");
157 eval {
158 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
159 $cleaned_replicated_volumes->{$volid} = 1;
160 };
161
162 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
163 # The likelihood that the delete has worked out is high at a timeout.
164 # If it really fails, it will try to remove on the next run.
165 if (my $err = $@) {
166 # warn is for syslog/journal.
167 warn $err;
168
169 # logfunc will written in replication log.
170 $logfunc->("delete stale replication snapshot error: $err");
171 }
172 # Last_sync=0 and a replication snapshot only occur, if the VM was stolen
173 } else {
174 $last_snapshots->{$volid}->{$snap} = 1;
edd61f2b 175 }
a6538c1e
DM
176 }
177 }
178 }
179
180 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
181}
182
183sub replicate_volume {
aa0d516f 184 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
a6538c1e
DM
185
186 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
187
c475e16d 188 my $ratelimit_bps = int(1000000*$rate) if $rate;
a6538c1e 189 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
aa0d516f 190 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
a6538c1e
DM
191}
192
193
194sub replicate {
195 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
196
197 my $local_node = PVE::INotify::nodename();
198
199 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
200
201 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
202
203 my $migration_network;
204 my $migration_type = 'secure';
205 if (my $mc = $dc_conf->{migration}) {
206 $migration_network = $mc->{network};
207 $migration_type = $mc->{type} if defined($mc->{type});
208 }
209
210 my $jobid = $jobcfg->{id};
211 my $storecfg = PVE::Storage::config();
212 my $last_sync = $state->{last_sync};
213
214 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
215 if $start_time <= $last_sync;
216
217 my $vmid = $jobcfg->{guest};
a6538c1e
DM
218
219 my $conf = $guest_class->load_config($vmid);
220 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
c324e907 221 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
a6538c1e
DM
222
223 my $sorted_volids = [ sort keys %$volumes ];
224
225 $running //= 0; # to avoid undef warnings from logfunc
226
6358ffe1
DM
227 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
228
229 $logfunc->("guest => $guest_name, running => $running");
a6538c1e
DM
230 $logfunc->("volumes => " . join(',', @$sorted_volids));
231
232 if (my $remove_job = $jobcfg->{remove_job}) {
233
234 $logfunc->("start job removal - mode '${remove_job}'");
235
236 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
237 # remove all remote volumes
d869a19c
WL
238 my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
239
240 my %hash = map { $_ => 1 } @store_list;
241
0c85474f 242 my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target});
d869a19c 243 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
a6538c1e
DM
244
245 }
246 # remove all local replication snapshots (lastsync => 0)
a1dfeff3 247 prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
a6538c1e
DM
248
249 PVE::ReplicationConfig::delete_job($jobid); # update config
250 $logfunc->("job removed");
251
5899ebbd 252 return undef;
a6538c1e
DM
253 }
254
0c85474f 255 my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target}, $migration_network);
a6538c1e 256
a6538c1e
DM
257 my $parent_snapname = $conf->{parent};
258
e4f63016
DM
259 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
260 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
a6538c1e
DM
261
262 my $storeid_hash = {};
263 foreach my $volid (@$sorted_volids) {
264 my ($storeid) = PVE::Storage::parse_volume_id($volid);
265 $storeid_hash->{$storeid} = 1;
266 }
267 $state->{storeid_list} = [ sort keys %$storeid_hash ];
268
269 # freeze filesystem for data consistency
270 if ($freezefs) {
271 $logfunc->("freeze guest filesystem");
272 $guest_class->__snapshot_freeze($vmid, 0);
273 }
274
275 # make snapshot of all volumes
e4f63016
DM
276 my $sync_snapname =
277 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
278
a6538c1e
DM
279 my $replicate_snapshots = {};
280 eval {
281 foreach my $volid (@$sorted_volids) {
282 $logfunc->("create snapshot '${sync_snapname}' on $volid");
283 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
284 $replicate_snapshots->{$volid} = 1;
285 }
286 };
287 my $err = $@;
288
5e93f430 289 # thaw immediately
a6538c1e 290 if ($freezefs) {
5e93f430 291 $logfunc->("thaw guest filesystem");
a6538c1e
DM
292 $guest_class->__snapshot_freeze($vmid, 1);
293 }
294
295 my $cleanup_local_snapshots = sub {
296 my ($volid_hash, $snapname) = @_;
297 foreach my $volid (sort keys %$volid_hash) {
298 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
299 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
300 warn $@ if $@;
301 }
302 };
303
304 if ($err) {
305 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
306 die $err;
307 }
308
309 eval {
310
311 my $rate = $jobcfg->{rate};
312 my $insecure = $migration_type eq 'insecure';
313
314 foreach my $volid (@$sorted_volids) {
315 my $base_snapname;
316
e4f63016
DM
317 if (defined($base_snapname = $base_snapshots->{$volid})) {
318 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
319 } else {
320 $logfunc->("full sync '$volid' ($sync_snapname)");
a6538c1e
DM
321 }
322
aa0d516f 323 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
a6538c1e
DM
324 }
325 };
a6538c1e 326
edd61f2b 327 if ($err = $@) {
a6538c1e
DM
328 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
329 # we do not cleanup the remote side here - this is done in
330 # next run of prepare_local_job
331 die $err;
332 }
333
334 # remove old snapshots because they are no longer needed
335 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
336
ce22af08
WL
337 eval {
338 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
339 };
a6538c1e 340
ce22af08
WL
341 # old snapshots will removed by next run from prepare_local_job.
342 if ($err = $@) {
343 # warn is for syslog/journal.
344 warn $err;
345
346 # logfunc will written in replication log.
c1797f7a 347 $logfunc->("delete stale replication snapshot error: $err");
ce22af08 348 }
5899ebbd
DM
349
350 return $volumes;
a6538c1e
DM
351}
352
353my $run_replication_nolock = sub {
ac02a68e 354 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e
DM
355
356 my $jobid = $jobcfg->{id};
357
5899ebbd
DM
358 my $volumes;
359
ab44df53 360 # we normally write errors into the state file,
a6538c1e
DM
361 # but we also catch unexpected errors and log them to syslog
362 # (for examply when there are problems writing the state file)
a6538c1e 363
ac02a68e
WL
364 my $state = PVE::ReplicationState::read_job_state($jobcfg);
365
366 PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
a6538c1e 367
ac02a68e 368 my $t0 = [gettimeofday];
a6538c1e 369
ac02a68e
WL
370 mkdir $PVE::ReplicationState::replicate_logdir;
371 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
372 open(my $logfd, '>', $logfile) ||
373 die "unable to open replication log '$logfile' - $!\n";
a6538c1e 374
ac02a68e
WL
375 my $logfunc_wrapper = sub {
376 my ($msg) = @_;
a6538c1e 377
ac02a68e
WL
378 my $ctime = get_log_time();
379 print $logfd "$ctime $jobid: $msg\n";
380 if ($logfunc) {
381 if ($verbose) {
382 $logfunc->("$ctime $jobid: $msg");
383 } else {
384 $logfunc->($msg);
3ec43aaf 385 }
ac02a68e
WL
386 }
387 };
a6538c1e 388
ac02a68e 389 $logfunc_wrapper->("start replication job");
a6538c1e 390
ac02a68e
WL
391 eval {
392 $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
393 };
394 my $err = $@;
a6538c1e 395
ac02a68e
WL
396 if ($err) {
397 my $msg = "end replication job with error: $err";
398 chomp $msg;
399 $logfunc_wrapper->($msg);
400 } else {
401 $logfunc_wrapper->("end replication job");
402 }
a6538c1e 403
ac02a68e 404 PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
c17dcb3e 405
ac02a68e 406 close($logfd);
1b82f171 407
ac02a68e 408 die $err if $err;
5899ebbd
DM
409
410 return $volumes;
a6538c1e
DM
411};
412
413sub run_replication {
ac02a68e 414 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e 415
5899ebbd
DM
416 my $volumes;
417
ac02a68e
WL
418 my $timeout = 2; # do not wait too long - we repeat periodically anyways
419 $volumes = PVE::GuestHelpers::guest_migration_lock(
420 $jobcfg->{guest}, $timeout, $run_replication_nolock,
421 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
422
5899ebbd 423 return $volumes;
a6538c1e
DM
424}
425
4261;