]> git.proxmox.com Git - pve-storage.git/blob - PVE/ReplicationTools.pm
pvesr: rename list => jobs, call API
[pve-storage.git] / PVE / ReplicationTools.pm
1 package PVE::ReplicationTools;
2
3 use warnings;
4 use strict;
5
6 use PVE::Tools qw(run_command);
7 use PVE::Cluster;
8 use PVE::QemuConfig;
9 use PVE::QemuServer;
10 use PVE::LXC::Config;
11 use PVE::LXC;
12 use PVE::Storage;
13 use Time::Local;
14 use JSON;
15 use Data::Dumper qw(Dumper);
16
17 my $STATE_DIR = '/var/lib/pve-replica';
18 my $STATE_PATH = "$STATE_DIR/pve-replica.state";
19
20 PVE::Cluster::cfs_update;
21 my $local_node = PVE::INotify::nodename();
22
23 my $cluster_nodes;
24
25 my $get_guestconfig = sub {
26 my ($vmid) = @_;
27
28 my $vms = PVE::Cluster::get_vmlist();
29
30 my $type = $vms->{ids}->{$vmid}->{type};
31
32 my $guestconf;
33 my $running;
34
35 if ($type =~ m/^qemu$/) {
36 $guestconf = PVE::QemuConfig->load_config($vmid);
37 $running = PVE::QemuServer::check_running($vmid);
38 } elsif ($type =~ m/^lxc$/) {
39 $guestconf = PVE::LXC::Config->load_config($vmid);
40 $running = PVE::LXC::check_running($vmid);
41 }
42
43 return ($guestconf, $type, $running);
44 };
45
46 sub write_state {
47 my ($state) = @_;
48
49 mkdir $STATE_DIR;
50
51 PVE::Tools::file_set_contents($STATE_PATH, JSON::encode_json($state));
52 }
53
54 sub read_state {
55
56 return {} if !(-e $STATE_PATH);
57
58 my $raw = PVE::Tools::file_get_contents($STATE_PATH);
59
60 return {} if $raw eq '';
61 return JSON::decode_json($raw);
62 }
63
64 sub get_node_ip {
65 my ($nodename) = @_;
66
67 my $remoteip = PVE::Cluster::remote_node_ip($nodename, 1);
68
69 my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
70 if (my $network = $dc_conf->{storage_replication_network}) {
71
72 my $cmd = ['ssh', '-o', 'Batchmode=yes', "root\@$remoteip", '--'
73 ,'pvecm', 'mtunnel', '--get_migration_ip',
74 '--migration_network', $network];
75
76 PVE::Tools::run_command($cmd, outfunc => sub {
77 my $line = shift;
78
79 if ($line =~ m/^ip: '($PVE::Tools::IPRE)'$/) {
80 $remoteip = $1;
81 }
82 });
83 }
84 return $remoteip;
85 }
86
87 sub get_all_jobs {
88
89 my $vms = PVE::Cluster::get_vmlist();
90
91 my $state = read_state();
92
93 my $jobs = {};
94
95 foreach my $vmid (keys %{$vms->{ids}}) {
96 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
97
98 my $vm_state = $state->{$vmid};
99 next if !defined($vm_state);
100
101 my $job = {};
102
103 $job->{limit} = $vm_state->{limit};
104 $job->{interval} = $vm_state->{interval};
105 $job->{tnode} = $vm_state->{tnode};
106 $job->{lastsync} = $vm_state->{lastsync};
107 $job->{state} = $vm_state->{state};
108 $job->{fail} = $vm_state->{fail};
109
110 $jobs->{$vmid} = $job;
111 }
112
113 return $jobs;
114 }
115
116 sub sync_guest {
117 my ($vmid, $param) = @_;
118
119 my $jobs = read_state();
120 $jobs->{$vmid}->{state} = 'sync';
121 write_state($jobs);
122
123 my ($guest_conf, $vm_type, $running) = &$get_guestconfig($vmid);
124 my $qga = 0;
125
126 my $job = $jobs->{$vmid};
127 my $tnode = $job->{tnode};
128
129 if ($vm_type eq "qemu" && defined($guest_conf->{agent}) ) {
130 $qga = PVE::QemuServer::qga_check_running($vmid)
131 if $running;
132 }
133
134 # will not die if a disk is not syncable
135 my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
136
137 # check if all nodes have the storage availible
138 my $storage_config = PVE::Storage::config();
139 foreach my $volid (keys %$disks) {
140 my ($storeid) = PVE::Storage::parse_volume_id($volid);
141
142 my $store = $storage_config->{ids}->{$storeid};
143 die "Storage $storeid not availible on node: $tnode\n"
144 if $store->{nodes} && !$store->{nodes}->{$tnode};
145 die "Storage $storeid not availible on node: $local_node\n"
146 if $store->{nodes} && !$store->{nodes}->{$local_node};
147
148 }
149
150 my $limit = $param->{limit};
151 $limit = $guest_conf->{replica_rate_limit}
152 if (!defined($limit));
153
154 my $snap_time = time();
155
156 die "Invalid synctime format: $job->{lastsync}."
157 if $job->{lastsync} !~ m/^(\d+)$/;
158
159 my $lastsync = $1;
160 my $incremental_snap = $lastsync ? "replica_$lastsync" : undef;
161
162 # freeze filesystem for data consistency
163 if ($qga) {
164 print "Freeze guest filesystem\n";
165
166 eval {
167 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
168 };
169 }
170
171 my $snapname = "replica_$snap_time";
172
173 my $disks_status = { snapname => $snapname };
174
175 my $sync_job = sub {
176
177 # make snapshot of all volumes
178 foreach my $volid (keys %$disks) {
179
180 eval {
181 PVE::Storage::volume_snapshot($storage_config, $volid, $snapname);
182 };
183
184 if (my $err = $@) {
185 if ($qga) {
186 print "Unfreeze guest filesystem\n";
187 eval {
188 PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
189 };
190 warn $@ if $@;
191 }
192 cleanup_snapshot($disks_status, $snapname, $storage_config, $running);
193 $jobs->{$vmid}->{state} = 'error';
194 write_state($jobs);
195
196 die $err;
197 }
198
199 $disks_status->{$volid}->{snapshot} = 1;
200 }
201
202 if ($qga) {
203 print "Unfreeze guest filesystem\n";
204 eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
205 warn $@ if $@;
206 }
207
208 my $ip = get_node_ip($tnode);
209
210 foreach my $volid (keys %$disks) {
211
212 eval {
213 PVE::Storage::volume_send($storage_config, $volid, $snapname,
214 $ip, $incremental_snap,
215 $param->{verbose}, $limit);
216 $job->{fail} = 0;
217 };
218
219 if (my $err = $@) {
220 cleanup_snapshot($disks_status, $snapname, $storage_config, $running, $ip);
221 $job->{fail}++;
222 $job->{state} = 'error' if $job->{fail} > 3;
223
224 $jobs->{$vmid} = $job;
225 write_state($jobs);
226 die $err;
227 }
228
229 $disks_status->{$volid}->{synced} = 1;
230 }
231
232 # delet old snapshot if exists
233 cleanup_snapshot($disks_status, $snapname, $storage_config, $running, $ip, $lastsync) if
234 $job->{lastsync} ne '0';
235
236 $job->{lastsync} = $snap_time;
237 $job->{state} = "ok";
238 $jobs->{$vmid} = $job;
239 write_state($jobs);
240 };
241
242 PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
243 die $@ if $@;
244
245 return $snap_time;
246 }
247
248 sub get_snapshots {
249 my ($vol, $prefix, $nodes) = @_;
250
251 my $plugin = $vol->{plugin};
252 return $plugin->get_snapshots($vol, $prefix, $nodes);
253 }
254
255 sub send_image {
256 my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
257
258 my $plugin = $vol->{plugin};
259 $plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
260 }
261
262 sub job_enable {
263 my ($vmid, $no_sync, $target) = @_;
264
265 my $update_state = sub {
266 my ($state) = @_;
267
268 my $jobs = read_state();
269 my $job = $jobs->{$vmid};
270 my ($config) = &$get_guestconfig($vmid);
271 my $param = {};
272
273 $job->{interval} = $config->{replica_interval} || 15;
274
275 $job->{tnode} = $target || $config->{replica_target};
276 die "Replication target must be set\n" if !defined($job->{tnode});
277
278 die "Target and source node can't be the same\n"
279 if $job->{tnode} eq $local_node;
280
281 $job->{fail} = 0;
282 if (!defined($job->{lastsync})) {
283
284 if ( my $lastsync = get_lastsync($vmid)) {
285 $job->{lastsync} = $lastsync;
286 } else {
287 $job->{lastsync} = 0;
288 }
289 }
290
291 $param->{verbose} = 1;
292
293 $job->{state} = 'ok';
294 $jobs->{$vmid} = $job;
295 write_state($jobs);
296
297 eval{
298 sync_guest($vmid, $param) if !defined($no_sync);
299 };
300 if (my $err = $@) {
301 $jobs->{$vmid}->{state} = 'error';
302 write_state($jobs);
303 die $err;
304 }
305 };
306
307 PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
308 die $@ if $@;
309 }
310
311 sub job_disable {
312 my ($vmid) = @_;
313
314 my $update_state = sub {
315
316 my $jobs = read_state();
317
318 if (defined($jobs->{$vmid})) {
319 $jobs->{$vmid}->{state} = 'off';
320 write_state($jobs);
321 } else {
322 print "No replica service for $vmid\n";
323 }
324 };
325
326 PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
327 die $@ if $@;
328 }
329
330 sub job_remove {
331 my ($vmid) = @_;
332
333 my $update_state = sub {
334
335 my $jobs = read_state();
336
337 if (defined($jobs->{$vmid})) {
338 delete($jobs->{$vmid});
339 write_state($jobs);
340 } else {
341 print "No replica service for $vmid\n";
342 }
343 };
344
345 PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
346 die $@ if $@;
347 }
348
349 sub get_syncable_guestdisks {
350 my ($config, $vm_type, $running, $noerr) = @_;
351
352 my $syncable_disks = {};
353
354 my $cfg = PVE::Storage::config();
355
356 my $warnings = 0;
357 my $func = sub {
358 my ($id, $volume) = @_;
359 return if !defined($volume->{replica}) || !$volume->{replica};
360
361 my $volname;
362 if ($vm_type eq 'qemu') {
363 $volname = $volume->{file};
364 } else {
365 $volname = $volume->{volume};
366 }
367
368 if( PVE::Storage::volume_has_feature($cfg, 'replicate', $volname , undef, $running)) {
369 $syncable_disks->{$volname} = 1;
370 } else {
371 warn "Can't sync Volume: $volname\n" if !$noerr;
372 $warnings = 1;
373 }
374 };
375
376 if ($vm_type eq 'qemu') {
377 PVE::QemuServer::foreach_drive($config, $func);
378 } elsif ($vm_type eq 'lxc') {
379 PVE::LXC::Config->foreach_mountpoint($config, $func);
380 } else {
381 die "Unknown VM type: $vm_type";
382 }
383
384 return wantarray ? ($warnings, $syncable_disks) : $syncable_disks;
385 }
386
387 sub destroy_all_snapshots {
388 my ($vmid, $regex, $node) = @_;
389
390 my $ip = defined($node) ? get_node_ip($node) : undef;
391
392 my ($guest_conf, $vm_type, $running) = &$get_guestconfig($vmid);
393
394 my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
395 my $cfg = PVE::Storage::config();
396
397 my $snapshots = {};
398 foreach my $volid (keys %$disks) {
399 $snapshots->{$volid} =
400 PVE::Storage::volume_snapshot_list($cfg, $volid, $regex, $node, $ip);
401 }
402
403 foreach my $volid (keys %$snapshots) {
404
405 if (defined($regex)) {
406 foreach my $snap (@{$snapshots->{$volid}}) {
407 if ($ip) {
408 PVE::Storage::volume_snapshot_delete_remote($cfg, $volid, $snap, $ip);
409 } else {
410 PVE::Storage::volume_snapshot_delete($cfg, $volid, $snap, $running);
411 }
412 }
413 } else {
414 if ($ip) {
415
416 my $cmd = ['ssh', '-o', 'Batchmode=yes', "root\@$ip", '--'
417 ,'pvesm', 'free', $volid];
418 PVE::Tools::run_command($cmd);
419 } else {
420 PVE::Storage::vdisk_free($cfg, $volid);
421 }
422 }
423 }
424
425 }
426
427 sub cleanup_snapshot {
428 my ($disks, $snapname, $cfg, $running, $ip, $lastsync_snap) = @_;
429
430 if ($lastsync_snap) {
431 $snapname = "replica_$lastsync_snap";
432 }
433
434 foreach my $volid (keys %$disks) {
435 next if $volid eq "snapname";
436
437 if (defined($lastsync_snap) || $disks->{$volid}->{synced}) {
438 PVE::Storage::volume_snapshot_delete_remote($cfg, $volid, $snapname, $ip);
439 }
440
441 if (defined($lastsync_snap) || $disks->{$volid}->{snapshot}) {
442 PVE::Storage::volume_snapshot_delete($cfg, $volid, $snapname, $running);
443 }
444 }
445 }
446
447 sub destroy_replica {
448 my ($vmid) = @_;
449
450 my $code = sub {
451
452 my $jobs = read_state();
453
454 return if !defined($jobs->{$vmid});
455
456 my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
457
458 destroy_all_snapshots($vmid, 'replica');
459 destroy_all_snapshots($vmid, undef, $guest_conf->{replica_target});
460
461 delete($jobs->{$vmid});
462
463 delete($guest_conf->{replica_rate_limit});
464 delete($guest_conf->{replica_rate_interval});
465 delete($guest_conf->{replica_target});
466 delete($guest_conf->{replica});
467
468 if ($vm_type eq 'qemu') {
469 PVE::QemuConfig->write_config($vmid, $guest_conf);
470 } else {
471 PVE::LXC::Config->write_config($vmid, $guest_conf);
472 }
473 write_state($jobs);
474 };
475
476 PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
477 die $@ if $@;
478 }
479
480 sub get_lastsync {
481 my ($vmid) = @_;
482
483 my ($conf, $vm_type) = &$get_guestconfig($vmid);
484
485 my $sync_vol = get_syncable_guestdisks($conf, $vm_type);
486 my $cfg = PVE::Storage::config();
487
488 my $time;
489 foreach my $volid (keys %$sync_vol) {
490 my $list =
491 PVE::Storage::volume_snapshot_list($cfg, $volid, 'replica', $local_node);
492
493 if (my $tmp_snap = shift @$list) {
494 $tmp_snap =~ m/^replica_(\d+)$/;
495 die "snapshots are not coherent\n"
496 if defined($time) && !($time eq $1);
497 $time = $1;
498 }
499 }
500
501 return $time;
502 }
503
504 sub get_last_replica_snap {
505 my ($volid) = @_;
506
507 my $cfg = PVE::Storage::config();
508 my $list = PVE::Storage::volume_snapshot_list($cfg, $volid, 'replica_', $local_node);
509
510 return shift @$list;
511 }
512
513 sub check_guest_volumes_syncable {
514 my ($conf, $vm_type) = @_;
515
516 my ($warnings, $disks) = get_syncable_guestdisks($conf, $vm_type, 1);
517
518 return undef if $warnings || !%$disks;
519
520 return 1;
521 }
522
523 sub update_conf {
524 my ($vmid, $key, $value) = @_;
525
526 if ($key eq 'replica_target') {
527 destroy_replica($vmid);
528 job_enable($vmid, undef, $value);
529 return;
530 }
531
532 my $update = sub {
533 my $jobs = read_state();
534
535 return if !defined($jobs->{$vmid});
536
537 if ($key eq 'replica_interval') {
538 $jobs->{$vmid}->{interval} = $value || 15;
539 } elsif ($key eq 'replica_rate_limit'){
540 $jobs->{$vmid}->{limit} = $value ||
541 delet $jobs->{$vmid}->{limit};
542 } else {
543 die "Config parameter $key not known";
544 }
545
546 write_state($jobs);
547 };
548
549 PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $update);
550 }
551
552 1;