]> git.proxmox.com Git - pve-manager.git/blame - PVE/Replication.pm
use new replication helpers from pve-guest-common
[pve-manager.git] / PVE / Replication.pm
CommitLineData
892821fd
DM
1package PVE::Replication;
2
3use warnings;
4use strict;
5use Data::Dumper;
6use JSON;
7use Time::HiRes qw(gettimeofday tv_interval);
8
9use PVE::INotify;
483f89dd 10use PVE::ProcFSTools;
892821fd
DM
11use PVE::Tools;
12use PVE::Cluster;
892821fd 13use PVE::Storage;
1a9dc09e 14use PVE::GuestHelpers;
892821fd 15use PVE::ReplicationConfig;
d255af01 16use PVE::ReplicationState;
892821fd 17
f842e812
DM
18
19# regression tests should overwrite this
20sub get_log_time {
21
22 return time();
23}
892821fd 24
acea170e 25sub remote_prepare_local_job {
f842e812 26 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
acea170e
DM
27
28 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
4ce4ae70 29 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
91ee6a2f 30 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
f9d38c54
DM
31 push @$cmd, @$volumes if scalar(@$volumes);
32
33 push @$cmd, '--last_sync', $last_sync;
db4c35c3
WL
34 push @$cmd, '--parent_snapname', $parent_snapname
35 if $parent_snapname;
f9d38c54 36 push @$cmd, '--force' if $force;
acea170e
DM
37
38 my $remote_snapshots;
39
40 my $parser = sub {
41 my $line = shift;
42 $remote_snapshots = JSON::decode_json($line);
43 };
44
f842e812
DM
45 my $logger = sub {
46 my $line = shift;
47 chomp $line;
48 $logfunc->("(remote_prepare_local_job) $line");
49 };
50
51 PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
acea170e
DM
52
53 die "prepare remote node failed - no result\n"
54 if !defined($remote_snapshots);
55
56 return $remote_snapshots;
57}
58
b11e512f 59sub remote_finalize_local_job {
f842e812 60 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
b11e512f
DM
61
62 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
63 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
4ce4ae70 64 @$volumes, '--last_sync', $last_sync];
b11e512f 65
f842e812
DM
66 my $logger = sub {
67 my $line = shift;
68 chomp $line;
69 $logfunc->("(remote_finalize_local_job) $line");
70 };
71
72 PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
b11e512f
DM
73}
74
91ee6a2f
DM
75# finds local replication snapshots from $last_sync
76# and removes all replication snapshots with other time stamps
fae99506 77sub prepare {
91ee6a2f
DM
78 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
79
80 $last_sync //= 0;
fae99506 81
9b24b5d4
DM
82 my ($prefix, $snapname) =
83 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
fae99506
DM
84
85 my $last_snapshots = {};
fa64c3fb 86 my $cleaned_replicated_volumes = {};
fae99506 87 foreach my $volid (@$volids) {
91ee6a2f 88 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
fae99506 89 foreach my $snap (@$list) {
91ee6a2f
DM
90 if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
91 $last_snapshots->{$volid}->{$snap} = 1;
92 } elsif ($snap =~ m/^\Q$prefix\E/) {
f842e812 93 $logfunc->("delete stale replication snapshot '$snap' on $volid");
fae99506 94 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
fa64c3fb 95 $cleaned_replicated_volumes->{$volid} = 1;
fae99506
DM
96 }
97 }
98 }
99
fa64c3fb 100 return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
fae99506 101}
f70997ea 102
7f6ff9dd 103sub replicate_volume {
ffe5a020 104 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
892821fd 105
ffe5a020
DM
106 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
107
108 # fixme: handle $rate, $insecure ??
109 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
110 $base_snapshot, $sync_snapname);
892821fd
DM
111}
112
f9d38c54 113
7f6ff9dd 114sub replicate {
b9da11aa 115 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
7f6ff9dd 116
7f6ff9dd
DM
117 my $local_node = PVE::INotify::nodename();
118
119 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
120
121 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
ffe5a020
DM
122
123 my $migration_network;
124 my $migration_type = 'secure';
125 if (my $mc = $dc_conf->{migration}) {
126 $migration_network = $mc->{network};
127 $migration_type = $mc->{type} if defined($mc->{type});
128 }
129
7f6ff9dd
DM
130 my $jobid = $jobcfg->{id};
131 my $storecfg = PVE::Storage::config();
356fbf79 132 my $last_sync = $state->{last_sync};
7f6ff9dd
DM
133
134 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
135 if $start_time <= $last_sync;
136
137 my $vmid = $jobcfg->{guest};
138 my $vmtype = $jobcfg->{vmtype};
139
b9da11aa
DM
140 my $conf = $guest_class->load_config($vmid);
141 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
142 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf);
7f6ff9dd
DM
143
144 my $sorted_volids = [ sort keys %$volumes ];
145
535a24b2
DM
146 $running //= 0; # to avoid undef warnings from logfunc
147
f842e812
DM
148 $logfunc->("guest => $vmid, type => $vmtype, running => $running");
149 $logfunc->("volumes => " . join(',', @$sorted_volids));
7f6ff9dd 150
f9d38c54
DM
151 if (my $remove_job = $jobcfg->{remove_job}) {
152
f842e812 153 $logfunc->("start job removal - mode '${remove_job}'");
f9d38c54 154
495aa710 155 if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
f9d38c54 156 # remove all remote volumes
d793b4a6 157 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
f842e812 158 remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
f9d38c54
DM
159
160 }
161 # remove all local replication snapshots (lastsync => 0)
91ee6a2f 162 prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
f9d38c54 163
d092dc4f 164 PVE::ReplicationConfig::delete_job($jobid); # update config
f842e812 165 $logfunc->("job removed");
f9d38c54
DM
166
167 return;
168 }
169
d793b4a6
DM
170 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
171
9b24b5d4
DM
172 my $last_sync_snapname =
173 PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
174 my $sync_snapname =
175 PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
7f6ff9dd 176
91ee6a2f
DM
177 my $parent_snapname = $conf->{parent};
178
179 # test if we have a replication_ snapshot from last sync
180 # and remove all other/stale replication snapshots
181
7f6ff9dd 182 my $last_snapshots = prepare(
91ee6a2f
DM
183 $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
184
185 # prepare remote side
186 my $remote_snapshots = remote_prepare_local_job(
f842e812 187 $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
7f6ff9dd 188
356fbf79
DM
189 my $storeid_hash = {};
190 foreach my $volid (@$sorted_volids) {
191 my ($storeid) = PVE::Storage::parse_volume_id($volid);
192 $storeid_hash->{$storeid} = 1;
193 }
194 $state->{storeid_list} = [ sort keys %$storeid_hash ];
195
7f6ff9dd 196 # freeze filesystem for data consistency
b9da11aa 197 if ($freezefs) {
f842e812 198 $logfunc->("freeze guest filesystem");
b9da11aa 199 $guest_class->__snapshot_freeze($vmid, 0);
7f6ff9dd
DM
200 }
201
202 # make snapshot of all volumes
203 my $replicate_snapshots = {};
204 eval {
205 foreach my $volid (@$sorted_volids) {
f842e812 206 $logfunc->("create snapshot '${sync_snapname}' on $volid");
7f6ff9dd
DM
207 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
208 $replicate_snapshots->{$volid} = 1;
209 }
210 };
211 my $err = $@;
212
213 # unfreeze immediately
b9da11aa
DM
214 if ($freezefs) {
215 $guest_class->__snapshot_freeze($vmid, 1);
7f6ff9dd
DM
216 }
217
218 my $cleanup_local_snapshots = sub {
219 my ($volid_hash, $snapname) = @_;
220 foreach my $volid (sort keys %$volid_hash) {
f842e812 221 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
b9da11aa 222 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
7f6ff9dd
DM
223 warn $@ if $@;
224 }
225 };
226
227 if ($err) {
228 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
229 die $err;
230 }
231
232 eval {
233
ffe5a020
DM
234 my $rate = $jobcfg->{rate};
235 my $insecure = $migration_type eq 'insecure';
236
7f6ff9dd 237 foreach my $volid (@$sorted_volids) {
ffe5a020 238 my $base_snapname;
91ee6a2f
DM
239
240 if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
241 if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
242 $remote_snapshots->{$volid}->{$last_sync_snapname}) {
f842e812 243 $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
91ee6a2f
DM
244 $base_snapname = $last_sync_snapname;
245 } elsif (defined($parent_snapname) &&
246 ($last_snapshots->{$volid}->{$parent_snapname} &&
247 $remote_snapshots->{$volid}->{$parent_snapname})) {
f842e812 248 $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
91ee6a2f
DM
249 $base_snapname = $parent_snapname;
250 }
7f6ff9dd 251 }
91ee6a2f 252
f842e812 253 $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
ffe5a020 254 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
7f6ff9dd
DM
255 }
256 };
257 $err = $@;
258
259 if ($err) {
260 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
261 # we do not cleanup the remote side here - this is done in
262 # next run of prepare_local_job
263 die $err;
264 }
265
266 # remove old snapshots because they are no longer needed
267 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
268
f842e812 269 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
7f6ff9dd
DM
270
271 die $err if $err;
272}
273
d255af01 274my $run_replication_nolock = sub {
b9da11aa 275 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
892821fd 276
f842e812
DM
277 my $jobid = $jobcfg->{id};
278
d255af01
DM
279 # we normaly write errors into the state file,
280 # but we also catch unexpected errors and log them to syslog
281 # (for examply when there are problems writing the state file)
282 eval {
283 my $state = PVE::ReplicationState::read_job_state($jobcfg);
892821fd 284
d255af01 285 my $t0 = [gettimeofday];
483f89dd 286
d255af01
DM
287 $state->{pid} = $$;
288 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
e137f69f 289 $state->{last_node} = PVE::INotify::nodename();
d255af01
DM
290 $state->{last_try} = $start_time;
291 $state->{last_iteration} = $iteration;
356fbf79 292 $state->{storeid_list} //= [];
6d645cb2 293
d255af01 294 PVE::ReplicationState::write_job_state($jobcfg, $state);
483f89dd 295
d092dc4f
DM
296 mkdir $PVE::ReplicationState::replicate_logdir;
297 my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
f842e812
DM
298 open(my $logfd, '>', $logfile) ||
299 die "unable to open replication log '$logfile' - $!\n";
300
301 my $logfunc_wrapper = sub {
302 my ($msg) = @_;
303
304 my $ctime = get_log_time();
305 print $logfd "$ctime $jobid: $msg\n";
306 $logfunc->("$ctime $jobid: $msg") if $logfunc;
307 };
308
309 $logfunc_wrapper->("start replication job");
f70997ea 310
d255af01 311 eval {
b9da11aa 312 replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
d255af01
DM
313 };
314 my $err = $@;
892821fd 315
d255af01
DM
316 $state->{duration} = tv_interval($t0);
317 delete $state->{pid};
318 delete $state->{ptime};
892821fd 319
d255af01 320 if ($err) {
f70997ea 321 chomp $err;
d255af01
DM
322 $state->{fail_count}++;
323 $state->{error} = "$err";
324 PVE::ReplicationState::write_job_state($jobcfg, $state);
f842e812 325 $logfunc_wrapper->("end replication job with error: $err");
f70997ea 326 } else {
f842e812 327 $logfunc_wrapper->("end replication job");
d255af01
DM
328 $state->{last_sync} = $start_time;
329 $state->{fail_count} = 0;
330 delete $state->{error};
331 PVE::ReplicationState::write_job_state($jobcfg, $state);
f70997ea 332 }
f842e812
DM
333
334 close($logfd);
d255af01
DM
335 };
336 if (my $err = $@) {
f842e812 337 warn "$jobid: got unexpected replication job error - $err";
d255af01
DM
338 }
339};
340
810c6776 341sub run_replication {
b9da11aa 342 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
d255af01
DM
343
344 eval {
345 my $timeout = 2; # do not wait too long - we repeat periodically anyways
346 PVE::GuestHelpers::guest_migration_lock(
347 $jobcfg->{guest}, $timeout, $run_replication_nolock,
b9da11aa 348 $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
d255af01
DM
349 };
350 if (my $err = $@) {
351 return undef if $noerr;
352 die $err;
892821fd 353 }
892821fd
DM
354}
355
3561;