]> git.proxmox.com Git - pmg-api.git/blame - PMG/Cluster.pm
upgradedb: fix typo
[pmg-api.git] / PMG / Cluster.pm
CommitLineData
0854fb22
DM
1package PMG::Cluster;
2
3use strict;
4use warnings;
45e68618 5use Data::Dumper;
0854fb22 6use Socket;
cfdf6608 7use File::Path;
9430b6d4 8use Time::HiRes qw (gettimeofday tv_interval);
45e68618 9
a7c7cad7 10use PVE::SafeSyslog;
0854fb22
DM
11use PVE::Tools;
12use PVE::INotify;
13
1fb2ab76 14use PMG::Utils;
a7c7cad7 15use PMG::Config;
9f67f5b3 16use PMG::ClusterConfig;
db303db4
DM
17use PMG::RuleDB;
18use PMG::RuleCache;
9430b6d4 19use PMG::MailQueue;
db303db4 20use PVE::APIClient::LWP;
0854fb22 21
8737f93a
DM
22sub remote_node_ip {
23 my ($nodename, $noerr) = @_;
24
d8782874 25 my $cinfo = PMG::ClusterConfig->new();
8737f93a 26
45e68618 27 foreach my $entry (values %{$cinfo->{ids}}) {
8737f93a
DM
28 if ($entry->{name} eq $nodename) {
29 my $ip = $entry->{ip};
30 return $ip if !wantarray;
31 my $family = PVE::Tools::get_host_address_family($ip);
32 return ($ip, $family);
33 }
34 }
35
36 # fallback: try to get IP by other means
9f67f5b3 37 return PMG::Utils::lookup_node_ip($nodename, $noerr);
8737f93a
DM
38}
39
d2e43f9e
DM
40sub get_master_node {
41 my ($cinfo) = @_;
42
d8782874 43 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
d2e43f9e
DM
44
45 return $cinfo->{master}->{name} if defined($cinfo->{master});
46
47 return 'localhost';
48}
49
cba17aeb
DM
50sub read_local_ssl_cert_fingerprint {
51 my $cert_path = "/etc/pmg/pmg-api.pem";
0854fb22 52
cba17aeb
DM
53 my $cert;
54 eval {
55 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
56 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
57 Net::SSLeay::BIO_free($bio);
58 };
59 if (my $err = $@) {
60 die "unable to read certificate '$cert_path' - $err\n";
61 }
0854fb22 62
cba17aeb
DM
63 if (!defined($cert)) {
64 die "unable to read certificate '$cert_path' - got empty value\n";
65 }
66
67 my $fp;
68 eval {
69 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
70 };
71 if (my $err = $@) {
72 die "unable to get fingerprint for '$cert_path' - $err\n";
73 }
0854fb22 74
cba17aeb
DM
75 if (!defined($fp) || $fp eq '') {
76 die "unable to get fingerprint for '$cert_path' - got empty value\n";
77 }
0854fb22 78
cba17aeb
DM
79 return $fp;
80}
0854fb22 81
cba17aeb
DM
82my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
83my $rootrsakey_fn = '/root/.ssh/id_rsa';
84my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
0854fb22 85
cba17aeb
DM
86sub read_local_cluster_info {
87
88 my $res = {};
89
90 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
91 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
92 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
93
94 die "unable to parse ${hostrsapubkey_fn}\n"
95 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
0854fb22
DM
96
97 my $nodename = PVE::INotify::nodename();
98
cba17aeb 99 $res->{name} = $nodename;
0854fb22 100
cba17aeb 101 $res->{ip} = PMG::Utils::lookup_node_ip($nodename);
0854fb22 102
cba17aeb 103 $res->{hostrsapubkey} = $hostrsapubkey;
0854fb22 104
cba17aeb
DM
105 if (! -f $rootrsapubkey_fn) {
106 unlink $rootrsakey_fn;
107 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
108 '-f', $rootrsakey_fn];
1fb2ab76 109 PMG::Utils::run_silent_cmd($cmd);
cba17aeb
DM
110 }
111
112 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
113 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
114 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
115
116 die "unable to parse ${rootrsapubkey_fn}\n"
117 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
118
119 $res->{rootrsapubkey} = $rootrsapubkey;
120
121 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
122
123 return $res;
124}
125
126# X509 Certificate cache helper
127
128my $cert_cache_nodes = {};
129my $cert_cache_timestamp = time();
130my $cert_cache_fingerprints = {};
0854fb22 131
cba17aeb 132sub update_cert_cache {
0854fb22 133
cba17aeb
DM
134 $cert_cache_timestamp = time();
135
136 $cert_cache_fingerprints = {};
137 $cert_cache_nodes = {};
138
d8782874 139 my $cinfo = PMG::ClusterConfig->new();
cba17aeb
DM
140
141 foreach my $entry (values %{$cinfo->{ids}}) {
142 my $node = $entry->{name};
143 my $fp = $entry->{fingerprint};
144 if ($node && $fp) {
145 $cert_cache_fingerprints->{$fp} = 1;
146 $cert_cache_nodes->{$node} = $fp;
0854fb22
DM
147 }
148 }
149}
150
151# load and cache cert fingerprint once
152sub initialize_cert_cache {
153 my ($node) = @_;
154
cba17aeb 155 update_cert_cache()
0854fb22
DM
156 if defined($node) && !defined($cert_cache_nodes->{$node});
157}
158
159sub check_cert_fingerprint {
160 my ($cert) = @_;
161
162 # clear cache every 30 minutes at least
cba17aeb 163 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
0854fb22
DM
164
165 # get fingerprint of server certificate
166 my $fp;
167 eval {
168 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
169 };
170 return 0 if $@ || !defined($fp) || $fp eq ''; # error
171
172 my $check = sub {
173 for my $expected (keys %$cert_cache_fingerprints) {
174 return 1 if $fp eq $expected;
175 }
176 return 0;
177 };
178
cba17aeb 179 return 1 if $check->();
0854fb22
DM
180
181 # clear cache and retry at most once every minute
182 if (time() - $cert_cache_timestamp >= 60) {
183 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
184 update_cert_cache();
cba17aeb 185 return $check->();
0854fb22
DM
186 }
187
188 return 0;
189}
190
58072364
DM
191my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
192my $rootsshauthkeys = "/root/.ssh/authorized_keys";
193my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
194
195sub update_ssh_keys {
196 my ($cinfo) = @_;
197
198 my $data = '';
199 foreach my $node (values %{$cinfo->{ids}}) {
200 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
201 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
202 }
203
204 PVE::Tools::file_set_contents($sshglobalknownhosts, $data);
205
206 $data = '';
207
208 # always add ourself
209 if (-f $ssh_rsa_id) {
210 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
211 chomp($pub);
212 $data .= "$pub\n";
213 }
214
215 foreach my $node (values %{$cinfo->{ids}}) {
216 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
217 }
218
219 if (-f $rootsshauthkeys) {
a7c7cad7
DM
220 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
221 chomp($mykey);
222 $data .= "$mykey\n";
58072364
DM
223 }
224
225 my $newdata = "";
226 my $vhash = {};
227 my @lines = split(/\n/, $data);
228 foreach my $line (@lines) {
229 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
230 next if $vhash->{$3}++;
231 }
232 $newdata .= "$line\n";
233 }
234
235 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600);
236}
237
a7c7cad7
DM
238my $cfgdir = '/etc/pmg';
239my $syncdir = "$cfgdir/master";
240
241my $cond_commit_synced_file = sub {
242 my ($filename, $dstfn) = @_;
243
244 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
245 my $srcfn = "$syncdir/$filename";
246
247 if (! -f $srcfn) {
248 unlink $dstfn;
249 return;
250 }
251
252 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
253
254 if (-f $dstfn) {
255 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
256 return 0 if $new eq $old;
257 }
258
259 rename($srcfn, $dstfn) ||
260 die "cond_rename_file '$filename' failed - $!\n";
261
262 print STDERR "updated $dstfn\n";
263
264 return 1;
265};
266
c9dae0df
DM
267my $rsync_command = sub {
268 my ($host_key_alias, @args) = @_;
269
270 my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
271 $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
272
273 my $cmd = ['rsync', $ssh_cmd, '-q', @args];
274
275 return $cmd;
276};
277
278sub sync_quarantine_files {
279 my ($host_ip, $host_name, $flistname) = @_;
280
9430b6d4
DM
281 my $spooldir = $PMG::MailQueue::spooldir;
282
c9dae0df
DM
283 my $cmd = $rsync_command->(
284 $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir,
285 '--files-from', $flistname);
286
9430b6d4 287 PVE::Tools::run_command($cmd);
c9dae0df
DM
288}
289
290sub sync_spooldir {
291 my ($host_ip, $host_name, $rcid) = @_;
292
9430b6d4
DM
293 my $spooldir = $PMG::MailQueue::spooldir;
294
c9dae0df
DM
295 mkdir "$spooldir/cluster/";
296 my $syncdir = "$spooldir/cluster/$rcid";
297 mkdir $syncdir;
298
299 my $cmd = $rsync_command->(
300 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir);
301
302 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
303 push @$cmd, '--include', $incl;
304 }
305
306 push @$cmd, '--exclude', '*';
307
308 PVE::Tools::run_command($cmd);
309}
310
311sub sync_master_quar {
312 my ($host_ip, $host_name) = @_;
313
9430b6d4
DM
314 my $spooldir = $PMG::MailQueue::spooldir;
315
c9dae0df
DM
316 my $syncdir = "$spooldir/cluster/";
317 mkdir $syncdir;
318
319 my $cmd = $rsync_command->(
320 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir);
321
322 PVE::Tools::run_command($cmd);
323}
324
58072364 325sub sync_config_from_master {
809ae8f4 326 my ($master_name, $master_ip, $noreload) = @_;
58072364 327
58072364 328 mkdir $syncdir;
a7c7cad7 329 File::Path::remove_tree($syncdir, {keep_root => 1});
58072364 330
a7c7cad7
DM
331 my $sa_conf_dir = "/etc/mail/spamassassin";
332 my $sa_custom_cf = "custom.cf";
58072364 333
c9dae0df 334 my $cmd = $rsync_command->(
f4b7112b 335 $master_name, '-aq',
c9dae0df
DM
336 "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
337 "$syncdir/",
f4b7112b 338 '--exclude', 'master/',
c9dae0df
DM
339 '--exclude', '*~',
340 '--exclude', '*.db',
341 '--exclude', 'pmg-api.pem',
342 '--exclude', 'pmg-tls.pem',
343 );
58072364
DM
344
345 my $errmsg = "syncing master configuration from '${master_ip}' failed";
346 PVE::Tools::run_command($cmd, errmsg => $errmsg);
a7c7cad7
DM
347
348 # verify that the remote host is cluster master
349 open (my $fh, '<', "$syncdir/cluster.conf") ||
350 die "unable to open synced cluster.conf - $!\n";
a7c7cad7 351
809ae8f4
DM
352 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
353
354 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
a7c7cad7
DM
355 die "host '$master_ip' is not cluster master\n";
356 }
357
809ae8f4
DM
358 my $role = $cinfo->{'local'}->{type} // '-';
359 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
a7c7cad7
DM
360 if $role eq '-';
361
809ae8f4 362 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
a7c7cad7
DM
363 if $role eq 'master';
364
a7c7cad7 365 $cond_commit_synced_file->('cluster.conf');
a7c7cad7
DM
366
367 my $files = [
368 'pmg-authkey.key',
369 'pmg-authkey.pub',
370 'pmg-csrf.key',
371 'ldap.conf',
372 'user.conf',
7cac3e28
DM
373 'domains',
374 'mynetworks',
375 'transport',
a7c7cad7
DM
376 ];
377
378 foreach my $filename (@$files) {
379 $cond_commit_synced_file->($filename);
380 }
381
382 my $force_restart = {};
383
384 if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
385 $force_restart->{spam} = 1;
386 }
387
388 $cond_commit_synced_file->('pmg.conf');
f4b7112b
DM
389
390 # sync user templates files/symlinks (not recursive)
391 my $srcdir = "$syncdir/templates";
392 if (-d $srcdir) {
393 my $dstdir = "$cfgdir/templates";
394 mkdir $dstdir;
395 my $names_hash = {};
396 foreach my $fn (<$srcdir/*>) {
397 next if $fn !~ m|^($srcdir/(.*))$|;
398 $fn = $1; # untaint;
399 my $name = $2;
400 $names_hash->{$name} = 1;
401 my $target = "$dstdir/$name";
402 if (-f $fn) {
403 $cond_commit_synced_file->("templates/$name", $target);
404 } elsif (-l $fn) {
405 warn "update $target failed - $!\n" if !rename($fn, $target);
406 }
407 }
408 # remove vanished files
409 foreach my $fn (<$dstdir/*>) {
410 next if $fn !~ m|^($dstdir/(.*))$|;
411 $fn = $1; # untaint;
412 my $name = $2;
413 next if $names_hash->{$name};
414 warn "unlink $fn failed - $!\n" if !unlink($fn);
415 }
416 }
58072364
DM
417}
418
db303db4
DM
419sub sync_ruledb_from_master {
420 my ($ldb, $rdb, $ni, $ticket) = @_;
421
422 my $ruledb = PMG::RuleDB->new($ldb);
423 my $rulecache = PMG::RuleCache->new($ruledb);
424
425 my $conn = PVE::APIClient::LWP->new(
426 ticket => $ticket,
427 cookie_name => 'PMGAuthCookie',
428 host => $ni->{ip},
429 cached_fingerprints => {
430 $ni->{fingerprint} => 1,
431 });
432
433 my $digest = $conn->get("/config/ruledb/digest", {});
434
435 return if $digest eq $rulecache->{digest}; # no changes
436
437 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
438
439 eval {
440 $ldb->begin_work;
441
442 $ldb->do("DELETE FROM Rule");
443 $ldb->do("DELETE FROM RuleGroup");
444 $ldb->do("DELETE FROM ObjectGroup");
445 $ldb->do("DELETE FROM Object");
446 $ldb->do("DELETE FROM Attribut");
447
448 eval {
449 $rdb->begin_work;
450
451 # read a consistent snapshot
452 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
453
454 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
455 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
456 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
457 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
458 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
459 };
460
461 $rdb->rollback; # end transaction
462
463 die $@ if $@;
464
465 # update sequences
466
467 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
468 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
469 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
470
471 $ldb->commit;
472 };
473 if (my $err = $@) {
474 $ldb->rollback;
475 die $err;
476 }
477
478 syslog('info', "finished rule database sync from host '$ni->{ip}'");
479}
480
9430b6d4
DM
481sub sync_quarantine_db {
482 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
483
484 my $rcid = $ni->{cid};
485
486 my $maxmails = 100000;
487
488 my $mscount = 0;
489
490 my $ctime = PMG::DBTools::get_remote_time($rdb);
491
492 my $maxcount = 1000;
493
494 my $count;
495
496 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
497
498 do { # get new values
499
500 $count = 0;
501
502 my $flistname = "/tmp/quarantinefilelist.$$";
503
504 eval {
505 $ldb->begin_work;
506
507 open(my $flistfh, '>', $flistname) ||
508 die "unable to open file '$flistname' - $!\n";
509
510 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
511
512 # sync CMailStore
513
514 my $sth = $rdb->prepare(
515 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
516 "ORDER BY cid,rid LIMIT ?");
517 $sth->execute($rcid, $lastid, $maxcount);
518
519 my $maxid;
520 my $callback = sub {
521 my $ref = shift;
522 $maxid = $ref->{rid};
523 print $flistfh "$ref->{file}\n";
524 };
525
526 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
527 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
528
529 close($flistfh);
530
531 my $starttime = [ gettimeofday() ];
532 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname);
533 $$rsynctime_ref += tv_interval($starttime);
534
535 if ($maxid) {
536 # sync CMSReceivers
537
538 $sth = $rdb->prepare(
539 "SELECT * from CMSReceivers WHERE " .
540 "CMailStore_CID = ? AND CMailStore_RID > ? " .
541 "AND CMailStore_RID <= ?");
542 $sth->execute($rcid, $lastid, $maxid);
543
39d91358 544 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver status mtime)];
9430b6d4
DM
545 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
546
547 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
548 }
549
550 $ldb->commit;
551 };
552 my $err = $@;
553
554 unlink $flistname;
555
556 if ($err) {
557 $ldb->rollback;
558 die $err;
559 }
560
561 $mscount += $count;
562
563 last if $mscount >= $maxmails;
564
565 } while ($count >= $maxcount);
566
567 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
568
569 eval { # synchronize status updates
570 $ldb->begin_work;
571
572 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
573
574 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
575 $sth->execute($lastmt);
576
577 my $update_sth = $ldb->prepare(
578 "UPDATE CMSReceivers SET status = ? WHERE " .
579 "CMailstore_CID = ? AND CMailstore_RID = ? AND PMail = ?;");
580 while (my $ref = $sth->fetchrow_hashref()) {
581 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
582 $ref->{cmailstore_rid}, $ref->{pmail});
583 }
584
585 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
586
587 $ldb->commit;
588 };
589 if (my $err = $@) {
590 $ldb->rollback;
591 die $err;
592 }
593
594 return $mscount;
595}
596
597sub sync_statistic_db {
598 my ($ldb, $rdb, $ni) = @_;
599
600 my $rcid = $ni->{cid};
601
602 my $maxmails = 100000;
603
604 my $mscount = 0;
605
606 my $maxcount = 1000;
607
608 my $count;
609
610 PMG::DBTools::create_clusterinfo_default(
611 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
612
613 do { # get new values
614
615 $count = 0;
616
617 eval {
618 $ldb->begin_work;
619
620 my $lastid = PMG::DBTools::read_int_clusterinfo(
621 $ldb, $rcid, 'lastid_CStatistic');
622
623 # sync CStatistic
624
625 my $sth = $rdb->prepare(
626 "SELECT * from CStatistic " .
627 "WHERE cid = ? AND rid > ? " .
628 "ORDER BY cid, rid LIMIT ?");
629 $sth->execute($rcid, $lastid, $maxcount);
630
631 my $maxid;
632 my $callback = sub {
633 my $ref = shift;
634 $maxid = $ref->{rid};
635 };
636
637 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
638 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
639
640 if ($maxid) {
641 # sync CReceivers
642
643 $sth = $rdb->prepare(
644 "SELECT * from CReceivers WHERE " .
645 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
646 $sth->execute($rcid, $lastid, $maxid);
647
648 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
649 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
650
651 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
652 }
653
654 $ldb->commit;
655 };
656 if (my $err = $@) {
657 $ldb->rollback;
658 die $err;
659 }
660
661 $mscount += $count;
662
663 last if $mscount >= $maxmails;
664
665 } while ($count >= $maxcount);
e86b85a3
DM
666
667 return $mscount;
9430b6d4
DM
668}
669
986eec31 670my $sync_generic_mtime_db = sub {
9430b6d4
DM
671 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
672
9430b6d4
DM
673 my $ctime = PMG::DBTools::get_remote_time($rdb);
674
675 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
676
677 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
678
4c93256a 679 my $sql_cmd = $selectfunc->($ctime, $lastmt);
9430b6d4 680
4c93256a 681 my $sth = $rdb->prepare($sql_cmd);
9430b6d4
DM
682
683 $sth->execute();
684
986eec31
DM
685 my $updates = 0;
686
4c93256a
DM
687 eval {
688 # use transaction to speedup things
689 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
4c93256a
DM
690 my $count = 0;
691 while (my $ref = $sth->fetchrow_hashref()) {
9d5bdd2a
DM
692 $ldb->begin_work if !$count;
693 $mergefunc->($ref);
4c93256a
DM
694 if (++$count >= $max) {
695 $count = 0;
696 $ldb->commit;
9430b6d4 697 }
986eec31 698 $updates++;
9430b6d4 699 }
9d5bdd2a
DM
700
701 $ldb->commit if $count;
9430b6d4 702 };
4c93256a
DM
703 if (my $err = $@) {
704 $ldb->rollback;
705 die $err;
9430b6d4
DM
706 }
707
9430b6d4
DM
708 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
709
986eec31
DM
710 return $updates;
711};
9430b6d4
DM
712
713sub sync_greylist_db {
714 my ($dbh, $rdb, $ni) = @_;
715
716 my $selectfunc = sub {
717 my ($ctime, $lastmt) = @_;
718 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
719 "mtime >= $lastmt AND CID != 0";
720 };
721
2e049252 722 my $merge_sth = $dbh->prepare($PMG::DBTools::cgreylist_merge_sql);
9430b6d4 723 my $mergefunc = sub {
986eec31 724 my ($ref) = @_;
9430b6d4 725
f413f920
DM
726 $merge_sth->execute(
727 $ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver},
728 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
729 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
9430b6d4
DM
730 };
731
986eec31 732 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
9430b6d4
DM
733}
734
735sub sync_userprefs_db {
736 my ($dbh, $rdb, $ni) = @_;
737
738 my $selectfunc = sub {
739 my ($ctime, $lastmt) = @_;
740
741 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
742 };
743
7d13157e 744 my $merge_sth = $dbh->prepare(
471b05ed 745 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
0527ea1a 746 'VALUES (?, ?, ?, 0) ' .
471b05ed 747 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
dbdf1298 748 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
3a9ffd85 749 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
0527ea1a 750 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
471b05ed 751
9430b6d4 752 my $mergefunc = sub {
986eec31 753 my ($ref) = @_;
9430b6d4 754
7d13157e 755 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data});
9430b6d4
DM
756 };
757
986eec31 758 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
9430b6d4
DM
759}
760
761sub sync_domainstat_db {
762 my ($dbh, $rdb, $ni) = @_;
763
764 my $selectfunc = sub {
765 my ($ctime, $lastmt) = @_;
766 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
767 };
768
1c7ea32c
DM
769 my $merge_sth = $dbh->prepare(
770 'INSERT INTO Domainstat ' .
771 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
772 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
773 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
774 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
775 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
776 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
777 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
778 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
779 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
780 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
781
9430b6d4 782 my $mergefunc = sub {
986eec31 783 my ($ref) = @_;
9430b6d4 784
1c7ea32c
DM
785 $merge_sth->execute(
786 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
787 $ref->{bytesin}, $ref->{bytesout},
788 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
789 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
790 };
791
986eec31 792 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
9430b6d4
DM
793}
794
795sub sync_dailystat_db {
796 my ($dbh, $rdb, $ni) = @_;
797
798 my $selectfunc = sub {
799 my ($ctime, $lastmt) = @_;
800 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
8bf584ae
DM
801 };
802
803 my $merge_sth = $dbh->prepare(
804 'INSERT INTO DailyStat ' .
805 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
806 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
807 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
808 'ON CONFLICT (Time) DO UPDATE SET ' .
809 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
810 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
811 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
812 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
813 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
814 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
815 'RBLCount = excluded.RBLCount, ' .
816 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
9430b6d4
DM
817
818 my $mergefunc = sub {
986eec31 819 my ($ref) = @_;
9430b6d4 820
dbdf1298 821 $merge_sth->execute(
8bf584ae
DM
822 $ref->{time}, $ref->{countin}, $ref->{countout},
823 $ref->{bytesin}, $ref->{bytesout},
824 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
825 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
826 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
827 };
828
986eec31 829 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
9430b6d4
DM
830}
831
832sub sync_virusinfo_db {
833 my ($dbh, $rdb, $ni) = @_;
834
835 my $selectfunc = sub {
836 my ($ctime, $lastmt) = @_;
837 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
838 };
839
80615ad6
DM
840 my $merge_sth = $dbh->prepare(
841 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
842 'VALUES (?,?,?,?) ' .
843 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
844 'Count = excluded.Count , MTime = excluded.MTime');
845
9430b6d4 846 my $mergefunc = sub {
986eec31 847 my ($ref) = @_;
9430b6d4 848
80615ad6 849 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
9430b6d4
DM
850 };
851
986eec31 852 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
9430b6d4
DM
853}
854
855sub sync_deleted_nodes_from_master {
856 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
857
858 my $rsynctime = 0;
859
860 my $cid_hash = {}; # fast lookup
5c6e6eeb 861 foreach my $ni (values %{$cinfo->{ids}}) {
9430b6d4
DM
862 $cid_hash->{$ni->{cid}} = $ni;
863 }
864
865 my $spooldir = $PMG::MailQueue::spooldir;
866
5c6e6eeb
DM
867 my $maxcid = $cinfo->{master}->{maxcid} // 0;
868
869 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
9430b6d4
DM
870 next if $cid_hash->{$rcid};
871
872 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
873
874 next if -f $done_marker; # already synced
875
876 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
877
878 my $starttime = [ gettimeofday() ];
879 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
880 $$rsynctime_ref += tv_interval($starttime);
881
882 my $fake_ni = {
883 ip => $masterni->{ip},
884 name => $masterni->{name},
885 cid => $rcid,
886 };
887
888 sync_quarantine_db($ldb, $masterdb, $fake_ni);
889
890 sync_statistic_db ($ldb, $masterdb, $fake_ni);
891
892 open(my $fh, ">>", $done_marker);
893 }
894}
895
896
0854fb22 8971;