]> git.proxmox.com Git - pve-guest-common.git/blame - PVE/Replication.pm
PVE::Replication::find_common_replication_snapshot - new helper
[pve-guest-common.git] / PVE / Replication.pm
CommitLineData
a6538c1e
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
8
9use PVE::INotify;
10use PVE::ProcFSTools;
11use PVE::Tools;
12use PVE::Cluster;
13use PVE::Storage;
14use PVE::GuestHelpers;
15use PVE::ReplicationConfig;
16use PVE::ReplicationState;
17
18
19# regression tests should overwrite this
20sub get_log_time {
21
22 return time();
23}
24
e4f63016
DM
25# Find common base replication snapshot, available on local and remote side.
26# Note: this also removes stale replication snapshots
27sub find_common_replication_snapshot {
28 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
29
30 my $last_sync_snapname =
31 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
32
33 # test if we have a replication_ snapshot from last sync
34 # and remove all other/stale replication snapshots
35
36 my $last_snapshots = prepare(
37 $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
38
39 # prepare remote side
40 my $remote_snapshots = remote_prepare_local_job(
41 $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
42
43 my $base_snapshots = {};
44
45 foreach my $volid (@$volumes) {
46 my $base_snapname;
47
48 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
49 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
50 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
51 $base_snapshots->{$volid} = $last_sync_snapname;
52 } elsif (defined($parent_snapname) &&
53 ($last_snapshots->{$volid}->{$parent_snapname} &&
54 $remote_snapshots->{$volid}->{$parent_snapname})) {
55 $base_snapshots->{$volid} = $parent_snapname;
56 }
57 }
58 }
59
60 return ($base_snapshots, $last_snapshots, $last_sync_snapname);
61}
62
a6538c1e
DM
63sub remote_prepare_local_job {
64 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
65
66 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
67 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
68 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
69 push @$cmd, @$volumes if scalar(@$volumes);
70
71 push @$cmd, '--last_sync', $last_sync;
72 push @$cmd, '--parent_snapname', $parent_snapname
73 if $parent_snapname;
74 push @$cmd, '--force' if $force;
75
76 my $remote_snapshots;
77
78 my $parser = sub {
79 my $line = shift;
80 $remote_snapshots = JSON::decode_json($line);
81 };
82
83 my $logger = sub {
84 my $line = shift;
85 chomp $line;
86 $logfunc->("(remote_prepare_local_job) $line");
87 };
88
89 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
90
91 die "prepare remote node failed - no result\n"
92 if !defined($remote_snapshots);
93
94 return $remote_snapshots;
95}
96
97sub remote_finalize_local_job {
98 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
99
100 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
101 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
102 @$volumes, '--last_sync', $last_sync];
103
104 my $logger = sub {
105 my $line = shift;
106 chomp $line;
107 $logfunc->("(remote_finalize_local_job) $line");
108 };
109
110 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
111}
112
113# finds local replication snapshots from $last_sync
114# and removes all replication snapshots with other time stamps
115sub prepare {
116 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
117
118 $last_sync //= 0;
119
b499eccb
DM
120 my ($prefix, $snapname);
121
122 if (defined($jobid)) {
123 ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
124 } else {
125 $prefix = '__replicate_';
126 }
a6538c1e
DM
127
128 my $last_snapshots = {};
129 my $cleaned_replicated_volumes = {};
130 foreach my $volid (@$volids) {
131 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
132 foreach my $snap (@$list) {
b499eccb
DM
133 if ((defined($snapname) && ($snap eq $snapname)) ||
134 (defined($parent_snapname) && ($snap eq $parent_snapname))) {
a6538c1e
DM
135 $last_snapshots->{$volid}->{$snap} = 1;
136 } elsif ($snap =~ m/^\Q$prefix\E/) {
137 $logfunc->("delete stale replication snapshot '$snap' on $volid");
138 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
139 $cleaned_replicated_volumes->{$volid} = 1;
140 }
141 }
142 }
143
144 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
145}
146
147sub replicate_volume {
148 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
149
150 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
151
c475e16d 152 my $ratelimit_bps = int(1000000*$rate) if $rate;
a6538c1e 153 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
7862a35c 154 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure);
a6538c1e
DM
155}
156
157
158sub replicate {
159 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
160
161 my $local_node = PVE::INotify::nodename();
162
163 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
164
165 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
166
167 my $migration_network;
168 my $migration_type = 'secure';
169 if (my $mc = $dc_conf->{migration}) {
170 $migration_network = $mc->{network};
171 $migration_type = $mc->{type} if defined($mc->{type});
172 }
173
174 my $jobid = $jobcfg->{id};
175 my $storecfg = PVE::Storage::config();
176 my $last_sync = $state->{last_sync};
177
178 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
179 if $start_time <= $last_sync;
180
181 my $vmid = $jobcfg->{guest};
182 my $vmtype = $jobcfg->{vmtype};
183
184 my $conf = $guest_class->load_config($vmid);
185 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
c324e907 186 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
a6538c1e
DM
187
188 my $sorted_volids = [ sort keys %$volumes ];
189
190 $running //= 0; # to avoid undef warnings from logfunc
191
192 $logfunc->("guest => $vmid, type => $vmtype, running => $running");
193 $logfunc->("volumes => " . join(',', @$sorted_volids));
194
195 if (my $remove_job = $jobcfg->{remove_job}) {
196
197 $logfunc->("start job removal - mode '${remove_job}'");
198
199 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
200 # remove all remote volumes
201 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
202 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
203
204 }
205 # remove all local replication snapshots (lastsync => 0)
206 prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
207
208 PVE::ReplicationConfig::delete_job($jobid); # update config
209 $logfunc->("job removed");
210
211 return;
212 }
213
214 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
215
a6538c1e
DM
216 my $parent_snapname = $conf->{parent};
217
e4f63016
DM
218 my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
219 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
a6538c1e
DM
220
221 my $storeid_hash = {};
222 foreach my $volid (@$sorted_volids) {
223 my ($storeid) = PVE::Storage::parse_volume_id($volid);
224 $storeid_hash->{$storeid} = 1;
225 }
226 $state->{storeid_list} = [ sort keys %$storeid_hash ];
227
228 # freeze filesystem for data consistency
229 if ($freezefs) {
230 $logfunc->("freeze guest filesystem");
231 $guest_class->__snapshot_freeze($vmid, 0);
232 }
233
234 # make snapshot of all volumes
e4f63016
DM
235 my $sync_snapname =
236 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
237
a6538c1e
DM
238 my $replicate_snapshots = {};
239 eval {
240 foreach my $volid (@$sorted_volids) {
241 $logfunc->("create snapshot '${sync_snapname}' on $volid");
242 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
243 $replicate_snapshots->{$volid} = 1;
244 }
245 };
246 my $err = $@;
247
248 # unfreeze immediately
249 if ($freezefs) {
250 $guest_class->__snapshot_freeze($vmid, 1);
251 }
252
253 my $cleanup_local_snapshots = sub {
254 my ($volid_hash, $snapname) = @_;
255 foreach my $volid (sort keys %$volid_hash) {
256 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
257 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
258 warn $@ if $@;
259 }
260 };
261
262 if ($err) {
263 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
264 die $err;
265 }
266
267 eval {
268
269 my $rate = $jobcfg->{rate};
270 my $insecure = $migration_type eq 'insecure';
271
272 foreach my $volid (@$sorted_volids) {
273 my $base_snapname;
274
e4f63016
DM
275 if (defined($base_snapname = $base_snapshots->{$volid})) {
276 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
277 } else {
278 $logfunc->("full sync '$volid' ($sync_snapname)");
a6538c1e
DM
279 }
280
a6538c1e
DM
281 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
282 }
283 };
284 $err = $@;
285
286 if ($err) {
287 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
288 # we do not cleanup the remote side here - this is done in
289 # next run of prepare_local_job
290 die $err;
291 }
292
293 # remove old snapshots because they are no longer needed
294 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
295
296 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
297
298 die $err if $err;
299}
300
301my $run_replication_nolock = sub {
302 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
303
304 my $jobid = $jobcfg->{id};
305
306 # we normaly write errors into the state file,
307 # but we also catch unexpected errors and log them to syslog
308 # (for examply when there are problems writing the state file)
309 eval {
310 my $state = PVE::ReplicationState::read_job_state($jobcfg);
311
312 my $t0 = [gettimeofday];
313
314 $state->{pid} = $$;
315 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
316 $state->{last_node} = PVE::INotify::nodename();
317 $state->{last_try} = $start_time;
318 $state->{last_iteration} = $iteration;
319 $state->{storeid_list} //= [];
320
321 PVE::ReplicationState::write_job_state($jobcfg, $state);
322
323 mkdir $PVE::ReplicationState::replicate_logdir;
324 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
325 open(my $logfd, '>', $logfile) ||
326 die "unable to open replication log '$logfile' - $!\n";
327
328 my $logfunc_wrapper = sub {
329 my ($msg) = @_;
330
331 my $ctime = get_log_time();
332 print $logfd "$ctime $jobid: $msg\n";
333 $logfunc->("$ctime $jobid: $msg") if $logfunc;
334 };
335
336 $logfunc_wrapper->("start replication job");
337
338 eval {
339 replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
340 };
341 my $err = $@;
342
343 $state->{duration} = tv_interval($t0);
344 delete $state->{pid};
345 delete $state->{ptime};
346
347 if ($err) {
348 chomp $err;
349 $state->{fail_count}++;
350 $state->{error} = "$err";
351 PVE::ReplicationState::write_job_state($jobcfg, $state);
352 $logfunc_wrapper->("end replication job with error: $err");
353 } else {
354 $logfunc_wrapper->("end replication job");
355 $state->{last_sync} = $start_time;
356 $state->{fail_count} = 0;
357 delete $state->{error};
358 PVE::ReplicationState::write_job_state($jobcfg, $state);
359 }
360
361 close($logfd);
362 };
363 if (my $err = $@) {
364 warn "$jobid: got unexpected replication job error - $err";
365 }
366};
367
368sub run_replication {
369 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
370
371 eval {
372 my $timeout = 2; # do not wait too long - we repeat periodically anyways
373 PVE::GuestHelpers::guest_migration_lock(
374 $jobcfg->{guest}, $timeout, $run_replication_nolock,
375 $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
376 };
377 if (my $err = $@) {
378 return undef if $noerr;
379 die $err;
380 }
381}
382
3831;