]> git.proxmox.com Git - pve-guest-common.git/blob - PVE/Replication.pm
ed57fed7c12acb4844dbac77a5e19906dbcb7c6c
[pve-guest-common.git] / PVE / Replication.pm
1 package PVE::Replication;
2
3 use warnings;
4 use strict;
5 use Data::Dumper;
6 use JSON;
7 use Time::HiRes qw(gettimeofday tv_interval);
8
9 use PVE::INotify;
10 use PVE::ProcFSTools;
11 use PVE::Tools;
12 use PVE::Cluster;
13 use PVE::Storage;
14 use PVE::GuestHelpers;
15 use PVE::ReplicationConfig;
16 use PVE::ReplicationState;
17
18
19 # regression tests should overwrite this
20 sub get_log_time {
21
22 return time();
23 }
24
25 sub remote_prepare_local_job {
26 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
27
28 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
29 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
30 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
31 push @$cmd, @$volumes if scalar(@$volumes);
32
33 push @$cmd, '--last_sync', $last_sync;
34 push @$cmd, '--parent_snapname', $parent_snapname
35 if $parent_snapname;
36 push @$cmd, '--force' if $force;
37
38 my $remote_snapshots;
39
40 my $parser = sub {
41 my $line = shift;
42 $remote_snapshots = JSON::decode_json($line);
43 };
44
45 my $logger = sub {
46 my $line = shift;
47 chomp $line;
48 $logfunc->("(remote_prepare_local_job) $line");
49 };
50
51 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
52
53 die "prepare remote node failed - no result\n"
54 if !defined($remote_snapshots);
55
56 return $remote_snapshots;
57 }
58
59 sub remote_finalize_local_job {
60 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
61
62 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
63 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
64 @$volumes, '--last_sync', $last_sync];
65
66 my $logger = sub {
67 my $line = shift;
68 chomp $line;
69 $logfunc->("(remote_finalize_local_job) $line");
70 };
71
72 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
73 }
74
75 # finds local replication snapshots from $last_sync
76 # and removes all replication snapshots with other time stamps
77 sub prepare {
78 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
79
80 $last_sync //= 0;
81
82 my ($prefix, $snapname);
83
84 if (defined($jobid)) {
85 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
86 } else {
87 $prefix = '__replicate_';
88 }
89
90 my $last_snapshots = {};
91 my $cleaned_replicated_volumes = {};
92 foreach my $volid (@$volids) {
93 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
94 foreach my $snap (@$list) {
95 if ((defined($snapname) && ($snap eq $snapname)) ||
96 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
97 $last_snapshots->{$volid}->{$snap} = 1;
98 } elsif ($snap =~ m/^\Q$prefix\E/) {
99 $logfunc->("delete stale replication snapshot '$snap' on $volid");
100 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
101 $cleaned_replicated_volumes->{$volid} = 1;
102 }
103 }
104 }
105
106 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
107 }
108
109 sub replicate_volume {
110 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
111
112 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
113
114 # fixme: handle $rate, $insecure ??
115 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
116 $base_snapshot, $sync_snapname);
117 }
118
119
120 sub replicate {
121 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
122
123 my $local_node = PVE::INotify::nodename();
124
125 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
126
127 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
128
129 my $migration_network;
130 my $migration_type = 'secure';
131 if (my $mc = $dc_conf->{migration}) {
132 $migration_network = $mc->{network};
133 $migration_type = $mc->{type} if defined($mc->{type});
134 }
135
136 my $jobid = $jobcfg->{id};
137 my $storecfg = PVE::Storage::config();
138 my $last_sync = $state->{last_sync};
139
140 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
141 if $start_time <= $last_sync;
142
143 my $vmid = $jobcfg->{guest};
144 my $vmtype = $jobcfg->{vmtype};
145
146 my $conf = $guest_class->load_config($vmid);
147 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
148 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf, defined($jobcfg->{remove_job}));
149
150 my $sorted_volids = [ sort keys %$volumes ];
151
152 $running //= 0; # to avoid undef warnings from logfunc
153
154 $logfunc->("guest => $vmid, type => $vmtype, running => $running");
155 $logfunc->("volumes => " . join(',', @$sorted_volids));
156
157 if (my $remove_job = $jobcfg->{remove_job}) {
158
159 $logfunc->("start job removal - mode '${remove_job}'");
160
161 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
162 # remove all remote volumes
163 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
164 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
165
166 }
167 # remove all local replication snapshots (lastsync => 0)
168 prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
169
170 PVE::ReplicationConfig::delete_job($jobid); # update config
171 $logfunc->("job removed");
172
173 return;
174 }
175
176 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
177
178 my $last_sync_snapname =
179 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
180 my $sync_snapname =
181 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
182
183 my $parent_snapname = $conf->{parent};
184
185 # test if we have a replication_ snapshot from last sync
186 # and remove all other/stale replication snapshots
187
188 my $last_snapshots = prepare(
189 $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
190
191 # prepare remote side
192 my $remote_snapshots = remote_prepare_local_job(
193 $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
194
195 my $storeid_hash = {};
196 foreach my $volid (@$sorted_volids) {
197 my ($storeid) = PVE::Storage::parse_volume_id($volid);
198 $storeid_hash->{$storeid} = 1;
199 }
200 $state->{storeid_list} = [ sort keys %$storeid_hash ];
201
202 # freeze filesystem for data consistency
203 if ($freezefs) {
204 $logfunc->("freeze guest filesystem");
205 $guest_class->__snapshot_freeze($vmid, 0);
206 }
207
208 # make snapshot of all volumes
209 my $replicate_snapshots = {};
210 eval {
211 foreach my $volid (@$sorted_volids) {
212 $logfunc->("create snapshot '${sync_snapname}' on $volid");
213 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
214 $replicate_snapshots->{$volid} = 1;
215 }
216 };
217 my $err = $@;
218
219 # unfreeze immediately
220 if ($freezefs) {
221 $guest_class->__snapshot_freeze($vmid, 1);
222 }
223
224 my $cleanup_local_snapshots = sub {
225 my ($volid_hash, $snapname) = @_;
226 foreach my $volid (sort keys %$volid_hash) {
227 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
228 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
229 warn $@ if $@;
230 }
231 };
232
233 if ($err) {
234 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
235 die $err;
236 }
237
238 eval {
239
240 my $rate = $jobcfg->{rate};
241 my $insecure = $migration_type eq 'insecure';
242
243 foreach my $volid (@$sorted_volids) {
244 my $base_snapname;
245
246 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
247 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
248 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
249 $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
250 $base_snapname = $last_sync_snapname;
251 } elsif (defined($parent_snapname) &&
252 ($last_snapshots->{$volid}->{$parent_snapname} &&
253 $remote_snapshots->{$volid}->{$parent_snapname})) {
254 $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
255 $base_snapname = $parent_snapname;
256 }
257 }
258
259 $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
260 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
261 }
262 };
263 $err = $@;
264
265 if ($err) {
266 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
267 # we do not cleanup the remote side here - this is done in
268 # next run of prepare_local_job
269 die $err;
270 }
271
272 # remove old snapshots because they are no longer needed
273 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
274
275 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
276
277 die $err if $err;
278 }
279
280 my $run_replication_nolock = sub {
281 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
282
283 my $jobid = $jobcfg->{id};
284
285 # we normaly write errors into the state file,
286 # but we also catch unexpected errors and log them to syslog
287 # (for examply when there are problems writing the state file)
288 eval {
289 my $state = PVE::ReplicationState::read_job_state($jobcfg);
290
291 my $t0 = [gettimeofday];
292
293 $state->{pid} = $$;
294 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
295 $state->{last_node} = PVE::INotify::nodename();
296 $state->{last_try} = $start_time;
297 $state->{last_iteration} = $iteration;
298 $state->{storeid_list} //= [];
299
300 PVE::ReplicationState::write_job_state($jobcfg, $state);
301
302 mkdir $PVE::ReplicationState::replicate_logdir;
303 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
304 open(my $logfd, '>', $logfile) ||
305 die "unable to open replication log '$logfile' - $!\n";
306
307 my $logfunc_wrapper = sub {
308 my ($msg) = @_;
309
310 my $ctime = get_log_time();
311 print $logfd "$ctime $jobid: $msg\n";
312 $logfunc->("$ctime $jobid: $msg") if $logfunc;
313 };
314
315 $logfunc_wrapper->("start replication job");
316
317 eval {
318 replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
319 };
320 my $err = $@;
321
322 $state->{duration} = tv_interval($t0);
323 delete $state->{pid};
324 delete $state->{ptime};
325
326 if ($err) {
327 chomp $err;
328 $state->{fail_count}++;
329 $state->{error} = "$err";
330 PVE::ReplicationState::write_job_state($jobcfg, $state);
331 $logfunc_wrapper->("end replication job with error: $err");
332 } else {
333 $logfunc_wrapper->("end replication job");
334 $state->{last_sync} = $start_time;
335 $state->{fail_count} = 0;
336 delete $state->{error};
337 PVE::ReplicationState::write_job_state($jobcfg, $state);
338 }
339
340 close($logfd);
341 };
342 if (my $err = $@) {
343 warn "$jobid: got unexpected replication job error - $err";
344 }
345 };
346
347 sub run_replication {
348 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
349
350 eval {
351 my $timeout = 2; # do not wait too long - we repeat periodically anyways
352 PVE::GuestHelpers::guest_migration_lock(
353 $jobcfg->{guest}, $timeout, $run_replication_nolock,
354 $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
355 };
356 if (my $err = $@) {
357 return undef if $noerr;
358 die $err;
359 }
360 }
361
362 1;