]> git.proxmox.com Git - pve-guest-common.git/blame - PVE/Replication.pm
Remove noerr form replication.
[pve-guest-common.git] / PVE / Replication.pm
CommitLineData
a6538c1e
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
93c3695b 8use POSIX qw(strftime);
a6538c1e
DM
9
10use PVE::INotify;
11use PVE::ProcFSTools;
12use PVE::Tools;
13use PVE::Cluster;
14use PVE::Storage;
15use PVE::GuestHelpers;
16use PVE::ReplicationConfig;
17use PVE::ReplicationState;
18
19
20# regression tests should overwrite this
21sub get_log_time {
22
93c3695b 23 return strftime("%F %H:%M:%S", localtime);
a6538c1e
DM
24}
25
e4f63016
DM
26# Find common base replication snapshot, available on local and remote side.
27# Note: this also removes stale replication snapshots
28sub find_common_replication_snapshot {
29 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
30
31 my $last_sync_snapname =
32 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
33
34 # test if we have a replication_ snapshot from last sync
35 # and remove all other/stale replication snapshots
36
37 my $last_snapshots = prepare(
38 $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
39
40 # prepare remote side
41 my $remote_snapshots = remote_prepare_local_job(
42 $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
43
44 my $base_snapshots = {};
45
46 foreach my $volid (@$volumes) {
47 my $base_snapname;
48
49 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
50 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
51 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
52 $base_snapshots->{$volid} = $last_sync_snapname;
53 } elsif (defined($parent_snapname) &&
54 ($last_snapshots->{$volid}->{$parent_snapname} &&
55 $remote_snapshots->{$volid}->{$parent_snapname})) {
56 $base_snapshots->{$volid} = $parent_snapname;
57 }
58 }
59 }
60
61 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
62}
63
a6538c1e
DM
64sub remote_prepare_local_job {
65 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
66
67 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
68 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
69 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
70 push @$cmd, @$volumes if scalar(@$volumes);
71
72 push @$cmd, '--last_sync', $last_sync;
73 push @$cmd, '--parent_snapname', $parent_snapname
74 if $parent_snapname;
75 push @$cmd, '--force' if $force;
76
77 my $remote_snapshots;
78
79 my $parser = sub {
80 my $line = shift;
81 $remote_snapshots = JSON::decode_json($line);
82 };
83
84 my $logger = sub {
85 my $line = shift;
86 chomp $line;
87 $logfunc->("(remote_prepare_local_job) $line");
88 };
89
90 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
91
92 die "prepare remote node failed - no result\n"
93 if !defined($remote_snapshots);
94
95 return $remote_snapshots;
96}
97
98sub remote_finalize_local_job {
99 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
100
101 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
102 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
103 @$volumes, '--last_sync', $last_sync];
104
105 my $logger = sub {
106 my $line = shift;
107 chomp $line;
108 $logfunc->("(remote_finalize_local_job) $line");
109 };
110
111 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
112}
113
114# finds local replication snapshots from $last_sync
115# and removes all replication snapshots with other time stamps
116sub prepare {
117 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
118
119 $last_sync //= 0;
120
b499eccb
DM
121 my ($prefix, $snapname);
122
123 if (defined($jobid)) {
124 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
125 } else {
126 $prefix = '__replicate_';
127 }
a6538c1e
DM
128
129 my $last_snapshots = {};
130 my $cleaned_replicated_volumes = {};
131 foreach my $volid (@$volids) {
132 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
133 foreach my $snap (@$list) {
b499eccb
DM
134 if ((defined($snapname) && ($snap eq $snapname)) ||
135 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
a6538c1e
DM
136 $last_snapshots->{$volid}->{$snap} = 1;
137 } elsif ($snap =~ m/^\Q$prefix\E/) {
138 $logfunc->("delete stale replication snapshot '$snap' on $volid");
139 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
140 $cleaned_replicated_volumes->{$volid} = 1;
141 }
142 }
143 }
144
145 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
146}
147
148sub replicate_volume {
aa0d516f 149 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
a6538c1e
DM
150
151 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
152
c475e16d 153 my $ratelimit_bps = int(1000000*$rate) if $rate;
a6538c1e 154 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
aa0d516f 155 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
a6538c1e
DM
156}
157
158
159sub replicate {
160 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
161
162 my $local_node = PVE::INotify::nodename();
163
164 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
165
166 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
167
168 my $migration_network;
169 my $migration_type = 'secure';
170 if (my $mc = $dc_conf->{migration}) {
171 $migration_network = $mc->{network};
172 $migration_type = $mc->{type} if defined($mc->{type});
173 }
174
175 my $jobid = $jobcfg->{id};
176 my $storecfg = PVE::Storage::config();
177 my $last_sync = $state->{last_sync};
178
179 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
180 if $start_time <= $last_sync;
181
182 my $vmid = $jobcfg->{guest};
a6538c1e
DM
183
184 my $conf = $guest_class->load_config($vmid);
185 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
c324e907 186 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
a6538c1e
DM
187
188 my $sorted_volids = [ sort keys %$volumes ];
189
190 $running //= 0; # to avoid undef warnings from logfunc
191
6358ffe1
DM
192 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
193
194 $logfunc->("guest => $guest_name, running => $running");
a6538c1e
DM
195 $logfunc->("volumes => " . join(',', @$sorted_volids));
196
197 if (my $remove_job = $jobcfg->{remove_job}) {
198
199 $logfunc->("start job removal - mode '${remove_job}'");
200
201 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
202 # remove all remote volumes
203 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
204 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
205
206 }
207 # remove all local replication snapshots (lastsync => 0)
208 prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
209
210 PVE::ReplicationConfig::delete_job($jobid); # update config
211 $logfunc->("job removed");
212
5899ebbd 213 return undef;
a6538c1e
DM
214 }
215
216 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
217
a6538c1e
DM
218 my $parent_snapname = $conf->{parent};
219
e4f63016
DM
220 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
221 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
a6538c1e
DM
222
223 my $storeid_hash = {};
224 foreach my $volid (@$sorted_volids) {
225 my ($storeid) = PVE::Storage::parse_volume_id($volid);
226 $storeid_hash->{$storeid} = 1;
227 }
228 $state->{storeid_list} = [ sort keys %$storeid_hash ];
229
230 # freeze filesystem for data consistency
231 if ($freezefs) {
232 $logfunc->("freeze guest filesystem");
233 $guest_class->__snapshot_freeze($vmid, 0);
234 }
235
236 # make snapshot of all volumes
e4f63016
DM
237 my $sync_snapname =
238 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
239
a6538c1e
DM
240 my $replicate_snapshots = {};
241 eval {
242 foreach my $volid (@$sorted_volids) {
243 $logfunc->("create snapshot '${sync_snapname}' on $volid");
244 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
245 $replicate_snapshots->{$volid} = 1;
246 }
247 };
248 my $err = $@;
249
5e93f430 250 # thaw immediately
a6538c1e 251 if ($freezefs) {
5e93f430 252 $logfunc->("thaw guest filesystem");
a6538c1e
DM
253 $guest_class->__snapshot_freeze($vmid, 1);
254 }
255
256 my $cleanup_local_snapshots = sub {
257 my ($volid_hash, $snapname) = @_;
258 foreach my $volid (sort keys %$volid_hash) {
259 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
260 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
261 warn $@ if $@;
262 }
263 };
264
265 if ($err) {
266 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
267 die $err;
268 }
269
270 eval {
271
272 my $rate = $jobcfg->{rate};
273 my $insecure = $migration_type eq 'insecure';
274
275 foreach my $volid (@$sorted_volids) {
276 my $base_snapname;
277
e4f63016
DM
278 if (defined($base_snapname = $base_snapshots->{$volid})) {
279 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
280 } else {
281 $logfunc->("full sync '$volid' ($sync_snapname)");
a6538c1e
DM
282 }
283
aa0d516f 284 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
a6538c1e
DM
285 }
286 };
287 $err = $@;
288
289 if ($err) {
290 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
291 # we do not cleanup the remote side here - this is done in
292 # next run of prepare_local_job
293 die $err;
294 }
295
296 # remove old snapshots because they are no longer needed
297 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
298
299 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
300
301 die $err if $err;
5899ebbd
DM
302
303 return $volumes;
a6538c1e
DM
304}
305
306my $run_replication_nolock = sub {
ac02a68e 307 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e
DM
308
309 my $jobid = $jobcfg->{id};
310
5899ebbd
DM
311 my $volumes;
312
a6538c1e
DM
313 # we normaly write errors into the state file,
314 # but we also catch unexpected errors and log them to syslog
315 # (for examply when there are problems writing the state file)
a6538c1e 316
ac02a68e
WL
317 my $state = PVE::ReplicationState::read_job_state($jobcfg);
318
319 PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
a6538c1e 320
ac02a68e 321 my $t0 = [gettimeofday];
a6538c1e 322
ac02a68e
WL
323 mkdir $PVE::ReplicationState::replicate_logdir;
324 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
325 open(my $logfd, '>', $logfile) ||
326 die "unable to open replication log '$logfile' - $!\n";
a6538c1e 327
ac02a68e
WL
328 my $logfunc_wrapper = sub {
329 my ($msg) = @_;
a6538c1e 330
ac02a68e
WL
331 my $ctime = get_log_time();
332 print $logfd "$ctime $jobid: $msg\n";
333 if ($logfunc) {
334 if ($verbose) {
335 $logfunc->("$ctime $jobid: $msg");
336 } else {
337 $logfunc->($msg);
3ec43aaf 338 }
ac02a68e
WL
339 }
340 };
a6538c1e 341
ac02a68e 342 $logfunc_wrapper->("start replication job");
a6538c1e 343
ac02a68e
WL
344 eval {
345 $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
346 };
347 my $err = $@;
a6538c1e 348
ac02a68e
WL
349 if ($err) {
350 my $msg = "end replication job with error: $err";
351 chomp $msg;
352 $logfunc_wrapper->($msg);
353 } else {
354 $logfunc_wrapper->("end replication job");
355 }
a6538c1e 356
ac02a68e 357 PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
c17dcb3e 358
ac02a68e 359 close($logfd);
1b82f171 360
ac02a68e 361 die $err if $err;
5899ebbd
DM
362
363 return $volumes;
a6538c1e
DM
364};
365
366sub run_replication {
ac02a68e 367 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
a6538c1e 368
5899ebbd
DM
369 my $volumes;
370
ac02a68e
WL
371 my $timeout = 2; # do not wait too long - we repeat periodically anyways
372 $volumes = PVE::GuestHelpers::guest_migration_lock(
373 $jobcfg->{guest}, $timeout, $run_replication_nolock,
374 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
375
5899ebbd 376 return $volumes;
a6538c1e
DM
377}
378
3791;