]> git.proxmox.com Git - pve-storage.git/blob - PVE/ReplicationTools.pm
build: run tests when building deb
[pve-storage.git] / PVE / ReplicationTools.pm
1 package PVE::ReplicationTools;
2
3 use warnings;
4 use strict;
5 use Data::Dumper;
6 use JSON;
7
8 use PVE::INotify;
9 use PVE::Tools;
10 use PVE::Cluster;
11 use PVE::QemuConfig;
12 use PVE::QemuServer;
13 use PVE::LXC::Config;
14 use PVE::LXC;
15 use PVE::Storage;
16
17 my $STATE_DIR = '/var/lib/pve-replica';
18 my $STATE_PATH = "$STATE_DIR/pve-replica.state";
19
20 my $get_ssh_cmd = sub {
21 my ($ip) = @_;
22
23 return ['ssh', '-o', 'Batchmode=yes', "root\@$ip" ];
24 };
25
26 sub get_guest_config {
27 my ($vmid) = @_;
28
29 my $vms = PVE::Cluster::get_vmlist();
30
31 die "no such guest '$vmid'\n" if !defined($vms->{ids}->{$vmid});
32
33 my $vm_type = $vms->{ids}->{$vmid}->{type};
34
35 my $conf;
36 my $running;
37
38 if ($vm_type eq 'qemu') {
39 $conf = PVE::QemuConfig->load_config($vmid);
40 $running = PVE::QemuServer::check_running($vmid);
41 } elsif ($vm_type eq 'lxc') {
42 $conf = PVE::LXC::Config->load_config($vmid);
43 $running = PVE::LXC::check_running($vmid);
44 } else {
45 die "internal error";
46 }
47
48 return ($conf, $vm_type, $running);
49 }
50
51 sub write_state {
52 my ($state) = @_;
53
54 mkdir $STATE_DIR;
55
56 PVE::Tools::file_set_contents($STATE_PATH, encode_json($state));
57 }
58
59 sub read_state {
60
61 return {} if ! -e $STATE_PATH;
62
63 my $raw = PVE::Tools::file_get_contents($STATE_PATH);
64
65 return {} if $raw eq '';
66
67 return decode_json($raw);
68 }
69
70 sub get_node_ip {
71 my ($nodename) = @_;
72
73 my $remoteip = PVE::Cluster::remote_node_ip($nodename);
74
75 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
76 if (my $network = $dc_conf->{storage_replication_network}) {
77
78 my $cmd = $get_ssh_cmd->($remoteip);
79
80 push @$cmd, '--', 'pvecm', 'mtunnel', '--get_migration_ip', '--migration_network', $network;
81
82 PVE::Tools::run_command($cmd, outfunc => sub {
83 my $line = shift;
84
85 if ($line =~ m/^ip: '($PVE::Tools::IPRE)'$/) {
86 $remoteip = $1;
87 }
88 });
89 }
90 return $remoteip;
91 }
92
93 sub get_all_jobs {
94
95 my $vms = PVE::Cluster::get_vmlist();
96
97 my $state = read_state();
98
99 my $jobs = {};
100
101 my $local_node = PVE::INotify::nodename();
102
103 foreach my $vmid (keys %{$vms->{ids}}) {
104 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
105 my $vm_state = $state->{$vmid};
106 next if !defined($vm_state);
107
108 my $job = {};
109
110 $job->{limit} = $vm_state->{limit};
111 $job->{interval} = $vm_state->{interval};
112 $job->{tnode} = $vm_state->{tnode};
113 $job->{lastsync} = $vm_state->{lastsync};
114 $job->{state} = $vm_state->{state};
115 $job->{fail} = $vm_state->{fail};
116
117 $jobs->{$vmid} = $job;
118 }
119
120 return $jobs;
121 }
122
123 sub sync_guest {
124 my ($vmid, $param) = @_;
125
126 my $local_node = PVE::INotify::nodename();
127
128 my $jobs = read_state();
129 $jobs->{$vmid}->{state} = 'sync';
130 write_state($jobs);
131
132 my ($guest_conf, $vm_type, $running) = get_guest_config($vmid);
133 my $qga = 0;
134
135 my $job = $jobs->{$vmid};
136 my $tnode = $job->{tnode};
137
138 if ($vm_type eq 'qemu' && defined($guest_conf->{agent}) ) {
139 $qga = PVE::QemuServer::qga_check_running($vmid)
140 if $running;
141 }
142
143 my $storecfg = PVE::Storage::config();
144 # will not die if a disk is not syncable
145 my $disks = get_replicatable_volumes($storecfg, $guest_conf, $vm_type);
146
147 # check if all nodes have the storage availible
148 foreach my $volid (keys %$disks) {
149 my ($storeid) = PVE::Storage::parse_volume_id($volid);
150
151 my $store = $storecfg->{ids}->{$storeid};
152 die "Storage $storeid not availible on node: $tnode\n"
153 if $store->{nodes} && !$store->{nodes}->{$tnode};
154 die "Storage $storeid not availible on node: $local_node\n"
155 if $store->{nodes} && !$store->{nodes}->{$local_node};
156
157 }
158
159 my $limit = $param->{limit};
160 $limit = $guest_conf->{replica_rate_limit}
161 if (!defined($limit));
162
163 my $snap_time = time();
164
165 die "Invalid synctime format: $job->{lastsync}."
166 if $job->{lastsync} !~ m/^(\d+)$/;
167
168 my $lastsync = $1;
169 my $incremental_snap = $lastsync ? "replica_$lastsync" : undef;
170
171 # freeze filesystem for data consistency
172 if ($qga) {
173 print "Freeze guest filesystem\n";
174
175 eval {
176 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
177 };
178 }
179
180 my $snapname = "replica_$snap_time";
181
182 my $disks_status = {};
183
184 my $sync_job = sub {
185
186 # make snapshot of all volumes
187 foreach my $volid (keys %$disks) {
188
189 eval {
190 PVE::Storage::volume_snapshot($storecfg, $volid, $snapname);
191 };
192
193 if (my $err = $@) {
194 if ($qga) {
195 print "Unfreeze guest filesystem\n";
196 eval {
197 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
198 };
199 warn $@ if $@;
200 }
201 cleanup_snapshot($disks_status, $snapname, $storecfg, $running);
202 $jobs->{$vmid}->{state} = 'error';
203 write_state($jobs);
204
205 die $err;
206 }
207
208 $disks_status->{$volid}->{snapshot} = 1;
209 }
210
211 if ($qga) {
212 print "Unfreeze guest filesystem\n";
213 eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
214 warn $@ if $@;
215 }
216
217 my $ip = get_node_ip($tnode);
218
219 foreach my $volid (keys %$disks) {
220
221 eval {
222 PVE::Storage::volume_send($storecfg, $volid, $snapname,
223 $ip, $incremental_snap,
224 $param->{verbose}, $limit);
225 $job->{fail} = 0;
226 };
227
228 if (my $err = $@) {
229 cleanup_snapshot($disks_status, $snapname, $storecfg, $running, $ip);
230 $job->{fail}++;
231 $job->{state} = 'error' if $job->{fail} > 3;
232
233 $jobs->{$vmid} = $job;
234 write_state($jobs);
235 die $err;
236 }
237
238 $disks_status->{$volid}->{synced} = 1;
239 }
240
241 # delete old snapshot if exists
242 cleanup_snapshot($disks_status, $snapname, $storecfg, $running, $ip, $lastsync) if
243 $lastsync != 0;
244
245 $job->{lastsync} = $snap_time;
246 $job->{state} = "ok";
247 $jobs->{$vmid} = $job;
248 write_state($jobs);
249 };
250
251 PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
252 die $@ if $@;
253
254 return $snap_time;
255 }
256
257 sub send_image {
258 my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
259
260 my $plugin = $vol->{plugin};
261 $plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
262 }
263
264 sub job_enable {
265 my ($vmid, $no_sync, $target) = @_;
266
267 my $local_node = PVE::INotify::nodename();
268
269 my $update_state = sub {
270 my ($state) = @_;
271
272 my $jobs = read_state();
273 my $job = $jobs->{$vmid};
274 my ($config) = get_guest_config($vmid);
275 my $param = {};
276
277 $job->{interval} = $config->{replica_interval} || 15;
278
279 $job->{tnode} = $target || $config->{replica_target};
280 die "Replication target must be set\n" if !defined($job->{tnode});
281
282 die "Target and source node can't be the same\n"
283 if $job->{tnode} eq $local_node;
284
285 $job->{fail} = 0;
286 if (!defined($job->{lastsync})) {
287
288 if ( my $lastsync = get_lastsync($vmid)) {
289 $job->{lastsync} = $lastsync;
290 } else {
291 $job->{lastsync} = 0;
292 }
293 }
294
295 $param->{verbose} = 1;
296
297 $job->{state} = 'ok';
298 $jobs->{$vmid} = $job;
299 write_state($jobs);
300
301 eval{
302 sync_guest($vmid, $param) if !defined($no_sync);
303 };
304 if (my $err = $@) {
305 $jobs->{$vmid}->{state} = 'error';
306 write_state($jobs);
307 die $err;
308 }
309 };
310
311 PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
312 die $@ if $@;
313 }
314
315 sub job_disable {
316 my ($vmid) = @_;
317
318 my $update_state = sub {
319
320 my $jobs = read_state();
321
322 if (defined($jobs->{$vmid})) {
323 $jobs->{$vmid}->{state} = 'off';
324 write_state($jobs);
325 } else {
326 print "No replica service for $vmid\n";
327 }
328 };
329
330 PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
331 die $@ if $@;
332 }
333
334 sub job_remove {
335 my ($vmid) = @_;
336
337 my $update_state = sub {
338
339 my $jobs = read_state();
340
341 if (defined($jobs->{$vmid})) {
342 delete($jobs->{$vmid});
343 write_state($jobs);
344 } else {
345 print "No replica service for $vmid\n";
346 }
347 };
348
349 PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
350 die $@ if $@;
351 }
352
353 sub get_replicatable_volumes {
354 my ($storecfg, $conf, $vm_type, $noerr) = @_;
355
356 if ($vm_type eq 'qemu') {
357 PVE::QemuConfig->get_replicatable_volumes($storecfg, $conf, $noerr);
358 } elsif ($vm_type eq 'lxc') {
359 PVE::LXC::Config->get_replicatable_volumes($storecfg, $conf, $noerr);
360 } else {
361 die "internal error";
362 }
363 }
364
365 sub destroy_all_snapshots {
366 my ($vmid, $regex, $node) = @_;
367
368 my $ip = defined($node) ? get_node_ip($node) : undef;
369
370 my ($guest_conf, $vm_type, $running) = get_guest_config($vmid);
371
372 my $storecfg = PVE::Storage::config();
373 my $disks = get_replicatable_volumes($storecfg, $guest_conf, $vm_type);
374
375 my $snapshots = {};
376 foreach my $volid (keys %$disks) {
377 $snapshots->{$volid} =
378 PVE::Storage::volume_snapshot_list($storecfg, $volid, $regex, $node, $ip);
379 }
380
381 foreach my $volid (keys %$snapshots) {
382
383 if (defined($regex)) {
384 foreach my $snap (@{$snapshots->{$volid}}) {
385 if ($ip) {
386 PVE::Storage::volume_snapshot_delete_remote($storecfg, $volid, $snap, $ip);
387 } else {
388 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap, $running);
389 }
390 }
391 } else {
392 if ($ip) {
393
394 my $cmd = $get_ssh_cmd->($ip);
395
396 push @$cmd, '--', 'pvesm', 'free', $volid;
397
398 PVE::Tools::run_command($cmd);
399 } else {
400 die "internal error";
401 }
402 }
403 }
404
405 }
406
407 sub cleanup_snapshot {
408 my ($disks, $snapname, $storecfg, $running, $ip, $lastsync_snap) = @_;
409
410 if ($lastsync_snap) {
411 $snapname = "replica_$lastsync_snap";
412 }
413
414 foreach my $volid (keys %$disks) {
415
416 if (defined($ip) && (defined($lastsync_snap) || $disks->{$volid}->{synced})) {
417 PVE::Storage::volume_snapshot_delete_remote($storecfg, $volid, $snapname, $ip);
418 }
419
420 if (defined($lastsync_snap) || $disks->{$volid}->{snapshot}) {
421 PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running);
422 }
423 }
424 }
425
426 sub destroy_replica {
427 my ($vmid) = @_;
428
429 my $code = sub {
430
431 my $jobs = read_state();
432
433 return if !defined($jobs->{$vmid});
434
435 my ($guest_conf, $vm_type) = get_guest_config($vmid);
436
437 destroy_all_snapshots($vmid, 'replica_');
438 destroy_all_snapshots($vmid, undef, $guest_conf->{replica_target});
439
440 delete($jobs->{$vmid});
441
442 delete($guest_conf->{replica_rate_limit});
443 delete($guest_conf->{replica_rate_interval});
444 delete($guest_conf->{replica_target});
445 delete($guest_conf->{replica});
446
447 if ($vm_type eq 'qemu') {
448 PVE::QemuConfig->write_config($vmid, $guest_conf);
449 } else {
450 PVE::LXC::Config->write_config($vmid, $guest_conf);
451 }
452 write_state($jobs);
453 };
454
455 PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
456 die $@ if $@;
457 }
458
459 sub get_lastsync {
460 my ($vmid) = @_;
461
462 my ($conf, $vm_type) = get_guest_config($vmid);
463
464 my $storecfg = PVE::Storage::config();
465 my $sync_vol = get_replicatable_volumes($storecfg, $conf, $vm_type);
466
467 my $time;
468 foreach my $volid (keys %$sync_vol) {
469 my $list =
470 PVE::Storage::volume_snapshot_list($storecfg, $volid, 'replica_');
471
472 if (my $tmp_snap = shift @$list) {
473 $tmp_snap =~ m/^replica_(\d+)$/;
474 die "snapshots are not coherent\n"
475 if defined($time) && !($time eq $1);
476 $time = $1;
477 }
478 }
479
480 return $time;
481 }
482
483 sub get_last_replica_snap {
484 my ($volid) = @_;
485
486 my $storecfg = PVE::Storage::config();
487 my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid, 'replica_');
488
489 return shift @$list;
490 }
491
492 sub update_conf {
493 my ($vmid, $key, $value) = @_;
494
495 if ($key eq 'replica_target') {
496 destroy_replica($vmid);
497 job_enable($vmid, undef, $value);
498 return;
499 }
500
501 my $update = sub {
502 my $jobs = read_state();
503
504 return if !defined($jobs->{$vmid});
505
506 if ($key eq 'replica_interval') {
507 $jobs->{$vmid}->{interval} = $value || 15;
508 } elsif ($key eq 'replica_rate_limit'){
509 $jobs->{$vmid}->{limit} = $value ||
510 delete $jobs->{$vmid}->{limit};
511 } else {
512 die "Config parameter $key not known";
513 }
514
515 write_state($jobs);
516 };
517
518 PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $update);
519 die $@ if $@;
520 }
521
522
523 1;