]> git.proxmox.com Git - pve-manager.git/blob - PVE/Replication.pm
711b08ffd7a7838aa1230ceefb5219cd42bdd68c
[pve-manager.git] / PVE / Replication.pm
1 package PVE::Replication;
2
3 use warnings;
4 use strict;
5 use Data::Dumper;
6 use JSON;
7 use Time::HiRes qw(gettimeofday tv_interval);
8
9 use PVE::INotify;
10 use PVE::ProcFSTools;
11 use PVE::Tools;
12 use PVE::CalendarEvent;
13 use PVE::Cluster;
14 use PVE::AbstractConfig;
15 use PVE::QemuConfig;
16 use PVE::QemuServer;
17 use PVE::LXC::Config;
18 use PVE::LXC;
19 use PVE::Storage;
20 use PVE::GuestHelpers;
21 use PVE::ReplicationConfig;
22
23 # Note: regression tests can overwrite $state_path for testing
24 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
25
26 my $update_job_state = sub {
27 my ($stateobj, $jobcfg, $state) = @_;
28
29 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
30
31 my $vmid = $jobcfg->{guest};
32 my $tid = $plugin->get_unique_target_id($jobcfg);
33
34 # Note: tuple ($vmid, $tid) is unique
35 $stateobj->{$vmid}->{$tid} = $state;
36
37 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
38 };
39
40 my $get_job_state = sub {
41 my ($stateobj, $jobcfg) = @_;
42
43 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
44
45 my $vmid = $jobcfg->{guest};
46 my $tid = $plugin->get_unique_target_id($jobcfg);
47 my $state = $stateobj->{$vmid}->{$tid};
48
49 $state = {} if !$state;
50
51 $state->{last_iteration} //= 0;
52 $state->{last_try} //= 0; # last sync start time
53 $state->{last_sync} //= 0; # last successful sync start time
54 $state->{fail_count} //= 0;
55
56 return $state;
57 };
58
59 my $read_state = sub {
60
61 return {} if ! -e $state_path;
62
63 my $raw = PVE::Tools::file_get_contents($state_path);
64
65 return {} if $raw eq '';
66
67 return decode_json($raw);
68 };
69
70 sub job_status {
71 my ($stateobj) = @_;
72
73 my $local_node = PVE::INotify::nodename();
74
75 my $jobs = {};
76
77 $stateobj = $read_state->() if !$stateobj;
78
79 my $cfg = PVE::ReplicationConfig->new();
80
81 my $vms = PVE::Cluster::get_vmlist();
82
83 foreach my $jobid (sort keys %{$cfg->{ids}}) {
84 my $jobcfg = $cfg->{ids}->{$jobid};
85 my $vmid = $jobcfg->{guest};
86
87 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
88
89 # skip non existing vms
90 next if !$vms->{ids}->{$vmid};
91
92 # only consider guest on local node
93 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
94
95 # never sync to local node
96 next if $jobcfg->{target} eq $local_node;
97
98 next if $jobcfg->{disable};
99
100 my $state = $get_job_state->($stateobj, $jobcfg);
101 $jobcfg->{state} = $state;
102 $jobcfg->{id} = $jobid;
103 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
104
105 my $next_sync = 0;
106 if (my $fail_count = $state->{fail_count}) {
107 if ($fail_count < 3) {
108 $next_sync = $state->{last_try} + 5*60*$fail_count;
109 }
110 } else {
111 my $schedule = $jobcfg->{schedule} || '*/15';
112 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
113 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
114 }
115 $jobcfg->{next_sync} = $next_sync;
116
117 $jobs->{$jobid} = $jobcfg;
118 }
119
120 return $jobs;
121 }
122
123 my $get_next_job = sub {
124 my ($stateobj, $iteration, $start_time) = @_;
125
126 my $next_jobid;
127
128 my $jobs = job_status($stateobj);
129
130 my $sort_func = sub {
131 my $joba = $jobs->{$a};
132 my $jobb = $jobs->{$b};
133 my $sa = $joba->{state};
134 my $sb = $jobb->{state};
135 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
136 return $res if $res != 0;
137 $res = $joba->{next_sync} <=> $jobb->{next_sync};
138 return $res if $res != 0;
139 return $joba->{guest} <=> $jobb->{guest};
140 };
141
142 foreach my $jobid (sort $sort_func keys %$jobs) {
143 my $jobcfg = $jobs->{$jobid};
144 next if $jobcfg->{state}->{last_iteration} >= $iteration;
145 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
146 $next_jobid = $jobid;
147 last;
148 }
149 }
150
151 return undef if !$next_jobid;
152
153 my $jobcfg = $jobs->{$next_jobid};
154
155 $jobcfg->{state}->{last_iteration} = $iteration;
156 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
157
158 return $jobcfg;
159 };
160
161 sub replication_snapshot_name {
162 my ($jobid, $last_sync) = @_;
163
164 my $prefix = "replicate_${jobid}_";
165 my $snapname = "${prefix}${last_sync}_snap";
166
167 wantarray ? ($prefix, $snapname) : $snapname;
168 }
169
170 sub remote_prepare_local_job {
171 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
172
173 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
174 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid,
175 $vmid, @$volumes, '--last_sync', $last_sync];
176
177 my $remote_snapshots;
178
179 my $parser = sub {
180 my $line = shift;
181 $remote_snapshots = JSON::decode_json($line);
182 };
183
184 PVE::Tools::run_command($cmd, outfunc => $parser);
185
186 die "prepare remote node failed - no result\n"
187 if !defined($remote_snapshots);
188
189 return $remote_snapshots;
190 }
191
192 sub remote_finalize_local_job {
193 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
194
195 my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
196 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
197 $vmid, @$volumes, '--last_sync', $last_sync];
198
199 PVE::Tools::run_command($cmd);
200 }
201
202 sub prepare {
203 my ($storecfg, $volids, $jobid, $last_sync, $start_time, $logfunc) = @_;
204
205 my ($prefix, $snapname) = replication_snapshot_name($jobid, $last_sync);
206
207 my $last_snapshots = {};
208 foreach my $volid (@$volids) {
209 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid, $prefix);
210 my $found = 0;
211 foreach my $snap (@$list) {
212 if ($snap eq $snapname) {
213 $last_snapshots->{$volid} = 1;
214 } else {
215 $logfunc->($start_time, "$jobid: delete stale snapshot '$snap' on $volid");
216 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
217 }
218 }
219 }
220
221 return $last_snapshots;
222 }
223
224 sub replicate_volume {
225 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
226
227 my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
228
229 # fixme: handle $rate, $insecure ??
230 PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
231 $base_snapshot, $sync_snapname);
232 }
233
234 sub replicate {
235 my ($jobcfg, $last_sync, $start_time, $logfunc) = @_;
236
237 $logfunc = sub {} if !$logfunc; # log nothing by default
238
239 my $local_node = PVE::INotify::nodename();
240
241 die "not implemented - internal error" if $jobcfg->{type} ne 'local';
242
243 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
244
245 my $migration_network;
246 my $migration_type = 'secure';
247 if (my $mc = $dc_conf->{migration}) {
248 $migration_network = $mc->{network};
249 $migration_type = $mc->{type} if defined($mc->{type});
250 }
251
252 my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
253
254 my $jobid = $jobcfg->{id};
255 my $storecfg = PVE::Storage::config();
256
257 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
258 if $start_time <= $last_sync;
259
260 my $vmid = $jobcfg->{guest};
261 my $vmtype = $jobcfg->{vmtype};
262
263 my $conf;
264 my $running;
265 my $qga;
266 my $volumes;
267
268 if ($vmtype eq 'qemu') {
269 $conf = PVE::QemuConfig->load_config($vmid);
270 $running = PVE::QemuServer::check_running($vmid);
271 $qga = PVE::QemuServer::qga_check_running($vmid)
272 if $running && $conf->{agent};
273 $volumes = PVE::QemuConfig->get_replicatable_volumes($storecfg, $conf);
274 } elsif ($vmtype eq 'lxc') {
275 $conf = PVE::LXC::Config->load_config($vmid);
276 $running = PVE::LXC::check_running($vmid);
277 $volumes = PVE::LXC::Config->get_replicatable_volumes($storecfg, $conf);
278 } else {
279 die "internal error";
280 }
281
282 my $sorted_volids = [ sort keys %$volumes ];
283
284 $logfunc->($start_time, "$jobid: guest => $vmid, type => $vmtype, running => $running");
285 $logfunc->($start_time, "$jobid: volumes => " . join(',', @$sorted_volids));
286
287 # prepare remote side
288 my $remote_snapshots = remote_prepare_local_job(
289 $ssh_info, $jobid, $vmid, $volumes, $last_sync);
290
291 # test if we have a replication_ snapshot from last sync
292 # and remove all other/stale replication snapshots
293 my $last_sync_snapname = replication_snapshot_name($jobid, $last_sync);
294 my $sync_snapname = replication_snapshot_name($jobid, $start_time);
295
296 my $last_snapshots = prepare(
297 $storecfg, $sorted_volids, $jobid, $last_sync, $start_time, $logfunc);
298
299 # freeze filesystem for data consistency
300 if ($qga) {
301 $logfunc->($start_time, "$jobid: freeze guest filesystem");
302 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
303 }
304
305 # make snapshot of all volumes
306 my $replicate_snapshots = {};
307 eval {
308 foreach my $volid (@$sorted_volids) {
309 $logfunc->($start_time, "$jobid: create snapshot '${sync_snapname}' on $volid");
310 PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
311 $replicate_snapshots->{$volid} = 1;
312 }
313 };
314 my $err = $@;
315
316 # unfreeze immediately
317 if ($qga) {
318 $logfunc->($start_time, "$jobid: unfreeze guest filesystem");
319 eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
320 warn $@ if $@; # ignore errors here, because we cannot fix it anyways
321 }
322
323 my $cleanup_local_snapshots = sub {
324 my ($volid_hash, $snapname) = @_;
325 foreach my $volid (sort keys %$volid_hash) {
326 $logfunc->($start_time, "$jobid: delete snapshot '$snapname' on $volid");
327 eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running); };
328 warn $@ if $@;
329 }
330 };
331
332 if ($err) {
333 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
334 die $err;
335 }
336
337 eval {
338
339 my $rate = $jobcfg->{rate};
340 my $insecure = $migration_type eq 'insecure';
341
342 foreach my $volid (@$sorted_volids) {
343 my $base_snapname;
344 if ($last_snapshots->{$volid} && $remote_snapshots->{$volid}) {
345 $logfunc->($start_time, "$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
346 $base_snapname = $last_sync_snapname;
347 } else {
348 $logfunc->($start_time, "$jobid: full sync '$volid' ($sync_snapname)");
349 }
350 replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
351 }
352 };
353 $err = $@;
354
355 if ($err) {
356 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
357 # we do not cleanup the remote side here - this is done in
358 # next run of prepare_local_job
359 die $err;
360 }
361
362 # remove old snapshots because they are no longer needed
363 $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
364
365 remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
366
367 die $err if $err;
368 }
369
370 my $run_replication = sub {
371 my ($stateobj, $jobcfg, $start_time, $logfunc) = @_;
372
373 my $state = delete $jobcfg->{state};
374
375 my $t0 = [gettimeofday];
376
377 # cleanup stale pid/ptime state
378 foreach my $vmid (keys %$stateobj) {
379 foreach my $tid (keys %{$stateobj->{$vmid}}) {
380 my $state = $stateobj->{$vmid}->{$tid};
381 delete $state->{pid};
382 delete $state->{ptime};
383 }
384 }
385
386 $state->{pid} = $$;
387 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
388 $state->{last_try} = $start_time;
389 $update_job_state->($stateobj, $jobcfg, $state);
390
391 $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
392
393 eval {
394 my $timeout = 2; # do not wait too long - we repeat periodically anyways
395 PVE::GuestHelpers::guest_migration_lock(
396 $jobcfg->{guest}, $timeout, \&replicate,
397 $jobcfg, $state->{last_sync}, $start_time, $logfunc);
398 };
399 my $err = $@;
400
401 $state->{duration} = tv_interval($t0);
402 delete $state->{pid};
403 delete $state->{ptime};
404
405 if ($err) {
406 $state->{fail_count}++;
407 $state->{error} = "$err";
408 $update_job_state->($stateobj, $jobcfg, $state);
409 if ($logfunc) {
410 chomp $err;
411 $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
412 } else {
413 warn $err;
414 }
415 } else {
416 $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
417 $state->{last_sync} = $start_time;
418 $state->{fail_count} = 0;
419 delete $state->{error};
420 $update_job_state->($stateobj, $jobcfg, $state);
421 }
422 };
423
424 sub run_single_job {
425 my ($jobid, $now, $logfunc) = @_; # passing $now useful for regression testing
426
427 my $local_node = PVE::INotify::nodename();
428
429 my $code = sub {
430 $now //= time();
431
432 my $stateobj = $read_state->();
433
434 my $cfg = PVE::ReplicationConfig->new();
435
436 my $jobcfg = $cfg->{ids}->{$jobid};
437 die "no such job '$jobid'\n" if !$jobcfg;
438
439 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
440
441 die "job '$jobid' is disabled\n" if $jobcfg->{disable};
442
443 my $vms = PVE::Cluster::get_vmlist();
444 my $vmid = $jobcfg->{guest};
445
446 die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
447
448 die "guest '$vmid' is not on local node\n"
449 if $vms->{ids}->{$vmid}->{node} ne $local_node;
450
451 die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
452
453 $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
454 $jobcfg->{id} = $jobid;
455 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
456
457 $jobcfg->{state}->{last_iteration} = $now;
458 $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
459
460 $run_replication->($stateobj, $jobcfg, $now, $logfunc);
461 };
462
463 my $res = PVE::Tools::lock_file($state_path, 60, $code);
464 die $@ if $@;
465 }
466
467 sub run_jobs {
468 my ($now, $logfunc) = @_; # useful for regression testing
469
470 my $iteration = $now // time();
471
472 my $code = sub {
473 my $stateobj = $read_state->();
474 my $start_time = $now // time();
475
476 while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
477 $run_replication->($stateobj, $jobcfg, $start_time, $logfunc);
478 $start_time = $now // time();
479 }
480 };
481
482 my $res = PVE::Tools::lock_file($state_path, 60, $code);
483 die $@ if $@;
484 }
485
486 1;