]> git.proxmox.com Git - pmg-api.git/blame - src/PMG/Cluster.pm
fix #2437: config: Add new tls_inbound_domains postfix map
[pmg-api.git] / src / PMG / Cluster.pm
CommitLineData
0854fb22
DM
1package PMG::Cluster;
2
3use strict;
4use warnings;
45e68618 5use Data::Dumper;
0854fb22 6use Socket;
cfdf6608 7use File::Path;
9430b6d4 8use Time::HiRes qw (gettimeofday tv_interval);
45e68618 9
a7c7cad7 10use PVE::SafeSyslog;
0854fb22
DM
11use PVE::Tools;
12use PVE::INotify;
0757859a 13use PVE::APIClient::LWP;
b0d26b8f 14use PVE::Network;
0854fb22 15
1fb2ab76 16use PMG::Utils;
a7c7cad7 17use PMG::Config;
9f67f5b3 18use PMG::ClusterConfig;
db303db4
DM
19use PMG::RuleDB;
20use PMG::RuleCache;
9430b6d4 21use PMG::MailQueue;
0757859a 22use PMG::Fetchmail;
c56a7321 23use PMG::Ticket;
0854fb22 24
8737f93a
DM
25sub remote_node_ip {
26 my ($nodename, $noerr) = @_;
27
d8782874 28 my $cinfo = PMG::ClusterConfig->new();
8737f93a 29
45e68618 30 foreach my $entry (values %{$cinfo->{ids}}) {
8737f93a
DM
31 if ($entry->{name} eq $nodename) {
32 my $ip = $entry->{ip};
33 return $ip if !wantarray;
34 my $family = PVE::Tools::get_host_address_family($ip);
35 return ($ip, $family);
36 }
37 }
38
39 # fallback: try to get IP by other means
687cb0ef
TL
40 if ($nodename eq 'localhost' || $nodename eq PVE::INotify::nodename()) {
41 return PVE::Network::get_local_ip();
42 } else {
43 return PVE::Network::get_ip_from_hostname($nodename, $noerr);
44 }
8737f93a
DM
45}
46
d2e43f9e
DM
47sub get_master_node {
48 my ($cinfo) = @_;
49
d8782874 50 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
d2e43f9e
DM
51
52 return $cinfo->{master}->{name} if defined($cinfo->{master});
53
54 return 'localhost';
55}
56
cba17aeb
DM
57sub read_local_ssl_cert_fingerprint {
58 my $cert_path = "/etc/pmg/pmg-api.pem";
0854fb22 59
cba17aeb
DM
60 my $cert;
61 eval {
62 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
63 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
64 Net::SSLeay::BIO_free($bio);
65 };
66 if (my $err = $@) {
67 die "unable to read certificate '$cert_path' - $err\n";
68 }
0854fb22 69
cba17aeb
DM
70 if (!defined($cert)) {
71 die "unable to read certificate '$cert_path' - got empty value\n";
72 }
73
74 my $fp;
75 eval {
76 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
77 };
78 if (my $err = $@) {
79 die "unable to get fingerprint for '$cert_path' - $err\n";
80 }
0854fb22 81
cba17aeb
DM
82 if (!defined($fp) || $fp eq '') {
83 die "unable to get fingerprint for '$cert_path' - got empty value\n";
84 }
0854fb22 85
cba17aeb
DM
86 return $fp;
87}
0854fb22 88
cba17aeb
DM
89my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
90my $rootrsakey_fn = '/root/.ssh/id_rsa';
91my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
0854fb22 92
cba17aeb
DM
93sub read_local_cluster_info {
94
95 my $res = {};
96
97 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
98 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
99 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
100
2eabf880 101 my $sshpubkeypattern = PMG::ClusterConfig::Node::valid_ssh_pubkey_regex();
cba17aeb 102 die "unable to parse ${hostrsapubkey_fn}\n"
2a5ed7c1 103 if $hostrsapubkey !~ m/$sshpubkeypattern/;
0854fb22
DM
104
105 my $nodename = PVE::INotify::nodename();
106
cba17aeb 107 $res->{name} = $nodename;
0854fb22 108
687cb0ef 109 $res->{ip} = PVE::Network::get_local_ip();
0854fb22 110
cba17aeb 111 $res->{hostrsapubkey} = $hostrsapubkey;
0854fb22 112
cba17aeb
DM
113 if (! -f $rootrsapubkey_fn) {
114 unlink $rootrsakey_fn;
115 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
116 '-f', $rootrsakey_fn];
1fb2ab76 117 PMG::Utils::run_silent_cmd($cmd);
cba17aeb
DM
118 }
119
120 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
121 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
122 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
123
124 die "unable to parse ${rootrsapubkey_fn}\n"
2a5ed7c1 125 if $rootrsapubkey !~ m/$sshpubkeypattern/;
cba17aeb
DM
126
127 $res->{rootrsapubkey} = $rootrsapubkey;
128
129 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
130
131 return $res;
132}
133
134# X509 Certificate cache helper
135
136my $cert_cache_nodes = {};
137my $cert_cache_timestamp = time();
138my $cert_cache_fingerprints = {};
0854fb22 139
cba17aeb 140sub update_cert_cache {
0854fb22 141
cba17aeb
DM
142 $cert_cache_timestamp = time();
143
144 $cert_cache_fingerprints = {};
145 $cert_cache_nodes = {};
146
d8782874 147 my $cinfo = PMG::ClusterConfig->new();
cba17aeb
DM
148
149 foreach my $entry (values %{$cinfo->{ids}}) {
150 my $node = $entry->{name};
151 my $fp = $entry->{fingerprint};
152 if ($node && $fp) {
153 $cert_cache_fingerprints->{$fp} = 1;
154 $cert_cache_nodes->{$node} = $fp;
0854fb22
DM
155 }
156 }
157}
158
159# load and cache cert fingerprint once
160sub initialize_cert_cache {
161 my ($node) = @_;
162
cba17aeb 163 update_cert_cache()
0854fb22
DM
164 if defined($node) && !defined($cert_cache_nodes->{$node});
165}
166
167sub check_cert_fingerprint {
168 my ($cert) = @_;
169
170 # clear cache every 30 minutes at least
cba17aeb 171 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
0854fb22
DM
172
173 # get fingerprint of server certificate
174 my $fp;
175 eval {
176 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
177 };
178 return 0 if $@ || !defined($fp) || $fp eq ''; # error
179
180 my $check = sub {
181 for my $expected (keys %$cert_cache_fingerprints) {
182 return 1 if $fp eq $expected;
183 }
184 return 0;
185 };
186
cba17aeb 187 return 1 if $check->();
0854fb22
DM
188
189 # clear cache and retry at most once every minute
190 if (time() - $cert_cache_timestamp >= 60) {
191 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
192 update_cert_cache();
cba17aeb 193 return $check->();
0854fb22
DM
194 }
195
196 return 0;
197}
198
58072364
DM
199my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
200my $rootsshauthkeys = "/root/.ssh/authorized_keys";
201my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
202
203sub update_ssh_keys {
204 my ($cinfo) = @_;
205
809dd92a 206 my $old = '';
58072364 207 my $data = '';
809dd92a 208
58072364
DM
209 foreach my $node (values %{$cinfo->{ids}}) {
210 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
211 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
212 }
213
809dd92a
DM
214 $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
215 if -f $sshglobalknownhosts;
216
217 PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
218 if $old ne $data;
58072364
DM
219
220 $data = '';
809dd92a 221 $old = '';
58072364
DM
222
223 # always add ourself
224 if (-f $ssh_rsa_id) {
225 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
226 chomp($pub);
227 $data .= "$pub\n";
228 }
229
230 foreach my $node (values %{$cinfo->{ids}}) {
231 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
232 }
233
234 if (-f $rootsshauthkeys) {
a7c7cad7
DM
235 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
236 chomp($mykey);
237 $data .= "$mykey\n";
58072364
DM
238 }
239
240 my $newdata = "";
241 my $vhash = {};
242 my @lines = split(/\n/, $data);
243 foreach my $line (@lines) {
244 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
245 next if $vhash->{$3}++;
246 }
247 $newdata .= "$line\n";
248 }
249
809dd92a
DM
250 $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
251 if -f $rootsshauthkeys;
252
253 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
254 if $old ne $newdata;
58072364
DM
255}
256
a7c7cad7
DM
257my $cfgdir = '/etc/pmg';
258my $syncdir = "$cfgdir/master";
259
260my $cond_commit_synced_file = sub {
261 my ($filename, $dstfn) = @_;
262
263 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
264 my $srcfn = "$syncdir/$filename";
265
266 if (! -f $srcfn) {
267 unlink $dstfn;
268 return;
269 }
270
271 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
272
273 if (-f $dstfn) {
274 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
275 return 0 if $new eq $old;
276 }
277
1dc946d4
DM
278 # set mtime (touch) to avoid time drift problems
279 utime(undef, undef, $srcfn);
280
a7c7cad7
DM
281 rename($srcfn, $dstfn) ||
282 die "cond_rename_file '$filename' failed - $!\n";
283
284 print STDERR "updated $dstfn\n";
285
286 return 1;
287};
288
8b455bef
SI
289my $ssh_command = sub {
290 my ($host_key_alias, @args) = @_;
291
292 my $cmd = ['ssh', '-l', 'root', '-o', 'BatchMode=yes'];
293 push @$cmd, '-o', "HostKeyAlias=${host_key_alias}" if $host_key_alias;
294 push @$cmd, @args if @args;
295 return $cmd;
296};
297
2930bbb2
SI
298sub get_remote_cert_fingerprint {
299 my ($ni) = @_;
300
301 my $ssh_cmd = $ssh_command->(
f9ee6324
TL
302 $ni->{name},
303 $ni->{ip},
304 'openssl x509 -noout -fingerprint -sha256 -in /etc/pmg/pmg-api.pem'
305 );
2930bbb2
SI
306 my $fp;
307 eval {
308 PVE::Tools::run_command($ssh_cmd, outfunc => sub {
309 my ($line) = @_;
310 if ($line =~ m/SHA256 Fingerprint=((?:[A-Fa-f0-9]{2}:){31}[A-Fa-f0-9]{2})/) {
311 $fp = $1;
312 }
313 });
314 die "parsing failed\n" if !$fp;
315 };
316 die "unable to get remote node fingerprint from '$ni->{name}': $@\n" if $@;
317
318 return $fp;
319}
320
b61378e1
SI
321sub trigger_update_fingerprints {
322 my ($cinfo) = @_;
323
324 my $master = $cinfo->{master} || die "unable to lookup master node\n";
0b0c58e6 325 my $cached_fp = { $master->{fingerprint} => 1 };
b61378e1
SI
326
327 # if running on master the current fingerprint for the API-connection is needed
0b0c58e6 328 # in addition (to prevent races with restarting pmgproxy
b61378e1 329 if ($cinfo->{local}->{type} eq 'master') {
0b0c58e6
SI
330 my $new_fp = PMG::Cluster::read_local_ssl_cert_fingerprint();
331 $cached_fp->{$new_fp} = 1;
b61378e1
SI
332 }
333
334 my $ticket = PMG::Ticket::assemble_ticket('root@pam');
335 my $csrftoken = PMG::Ticket::assemble_csrf_prevention_token('root@pam');
336 my $conn = PVE::APIClient::LWP->new(
337 ticket => $ticket,
338 csrftoken => $csrftoken,
339 cookie_name => 'PMGAuthCookie',
340 host => $master->{ip},
0b0c58e6
SI
341 cached_fingerprints => $cached_fp,
342 );
b61378e1
SI
343
344 $conn->post("/config/cluster/update-fingerprints", {});
345 return undef;
346}
347
c9dae0df
DM
348my $rsync_command = sub {
349 my ($host_key_alias, @args) = @_;
350
8b455bef 351 my $ssh_cmd = join(' ', @{$ssh_command->($host_key_alias)});
c9dae0df 352
8b455bef 353 my $cmd = ['rsync', "--rsh=$ssh_cmd", '-q', @args];
c9dae0df
DM
354
355 return $cmd;
356};
357
358sub sync_quarantine_files {
89eefd36 359 my ($host_ip, $host_name, $flistname, $rcid) = @_;
c9dae0df 360
9430b6d4
DM
361 my $spooldir = $PMG::MailQueue::spooldir;
362
89eefd36
DM
363 mkdir "$spooldir/cluster/";
364 my $syncdir = "$spooldir/cluster/$rcid";
365 mkdir $syncdir;
366
c9dae0df 367 my $cmd = $rsync_command->(
4daf3a35 368 $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
c9dae0df
DM
369 '--files-from', $flistname);
370
9430b6d4 371 PVE::Tools::run_command($cmd);
c9dae0df
DM
372}
373
374sub sync_spooldir {
375 my ($host_ip, $host_name, $rcid) = @_;
376
9430b6d4
DM
377 my $spooldir = $PMG::MailQueue::spooldir;
378
c9dae0df
DM
379 mkdir "$spooldir/cluster/";
380 my $syncdir = "$spooldir/cluster/$rcid";
381 mkdir $syncdir;
382
383 my $cmd = $rsync_command->(
4daf3a35 384 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
c9dae0df
DM
385
386 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
387 push @$cmd, '--include', $incl;
388 }
389
390 push @$cmd, '--exclude', '*';
391
392 PVE::Tools::run_command($cmd);
393}
394
395sub sync_master_quar {
396 my ($host_ip, $host_name) = @_;
397
9430b6d4
DM
398 my $spooldir = $PMG::MailQueue::spooldir;
399
c9dae0df
DM
400 my $syncdir = "$spooldir/cluster/";
401 mkdir $syncdir;
402
403 my $cmd = $rsync_command->(
4daf3a35 404 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
c9dae0df
DM
405
406 PVE::Tools::run_command($cmd);
407}
408
58072364 409sub sync_config_from_master {
809ae8f4 410 my ($master_name, $master_ip, $noreload) = @_;
58072364 411
58072364 412 mkdir $syncdir;
a7c7cad7 413 File::Path::remove_tree($syncdir, {keep_root => 1});
58072364 414
a7c7cad7
DM
415 my $sa_conf_dir = "/etc/mail/spamassassin";
416 my $sa_custom_cf = "custom.cf";
957d0882 417 my $sa_rules_cf = "pmg-scores.cf";
58072364 418
c9dae0df 419 my $cmd = $rsync_command->(
f4b7112b 420 $master_name, '-aq',
957d0882 421 "[${master_ip}]:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf} ${sa_conf_dir}/${sa_rules_cf}",
c9dae0df 422 "$syncdir/",
f4b7112b 423 '--exclude', 'master/',
c9dae0df
DM
424 '--exclude', '*~',
425 '--exclude', '*.db',
426 '--exclude', 'pmg-api.pem',
427 '--exclude', 'pmg-tls.pem',
428 );
58072364
DM
429
430 my $errmsg = "syncing master configuration from '${master_ip}' failed";
431 PVE::Tools::run_command($cmd, errmsg => $errmsg);
a7c7cad7
DM
432
433 # verify that the remote host is cluster master
434 open (my $fh, '<', "$syncdir/cluster.conf") ||
435 die "unable to open synced cluster.conf - $!\n";
a7c7cad7 436
809ae8f4
DM
437 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
438
439 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
a7c7cad7
DM
440 die "host '$master_ip' is not cluster master\n";
441 }
442
809ae8f4
DM
443 my $role = $cinfo->{'local'}->{type} // '-';
444 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
a7c7cad7
DM
445 if $role eq '-';
446
809ae8f4 447 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
a7c7cad7
DM
448 if $role eq 'master';
449
a7c7cad7 450 $cond_commit_synced_file->('cluster.conf');
a7c7cad7 451
809dd92a
DM
452 update_ssh_keys($cinfo); # rewrite ssh keys
453
0757859a
DM
454 PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
455
a7c7cad7
DM
456 my $files = [
457 'pmg-authkey.key',
458 'pmg-authkey.pub',
459 'pmg-csrf.key',
460 'ldap.conf',
461 'user.conf',
2741f0b0 462 'tfa.json',
7cac3e28
DM
463 'domains',
464 'mynetworks',
465 'transport',
959aaeba 466 'tls_policy',
fbb8db63 467 'tls_inbound_domains',
fd6feef4 468 'fetchmailrc',
a7c7cad7
DM
469 ];
470
471 foreach my $filename (@$files) {
472 $cond_commit_synced_file->($filename);
473 }
474
06cebf83
SI
475 my $dirs = [
476 'templates',
477 'dkim',
2cfdd9a1 478 'pbs',
6a477857 479 'acme',
06cebf83
SI
480 ];
481
482 foreach my $dir (@$dirs) {
483 my $srcdir = "$syncdir/$dir";
484
485 if ( -d $srcdir ) {
486 my $cmd = ['rsync', '-aq', '--delete-after', "$srcdir/", "$cfgdir/$dir"];
487 PVE::Tools::run_command($cmd);
488 }
489
490 }
0757859a 491
a7c7cad7
DM
492 my $force_restart = {};
493
957d0882
DC
494 for my $file (($sa_custom_cf, $sa_rules_cf)) {
495 if ($cond_commit_synced_file->($file, "${sa_conf_dir}/${file}")) {
496 $force_restart->{'pmg-smtp-filter'} = 1;
497 }
a7c7cad7
DM
498 }
499
500 $cond_commit_synced_file->('pmg.conf');
f4b7112b 501
5314a3a9 502 return $force_restart;
58072364
DM
503}
504
db303db4
DM
505sub sync_ruledb_from_master {
506 my ($ldb, $rdb, $ni, $ticket) = @_;
507
508 my $ruledb = PMG::RuleDB->new($ldb);
509 my $rulecache = PMG::RuleCache->new($ruledb);
510
511 my $conn = PVE::APIClient::LWP->new(
512 ticket => $ticket,
513 cookie_name => 'PMGAuthCookie',
514 host => $ni->{ip},
515 cached_fingerprints => {
516 $ni->{fingerprint} => 1,
517 });
518
519 my $digest = $conn->get("/config/ruledb/digest", {});
520
521 return if $digest eq $rulecache->{digest}; # no changes
522
523 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
524
525 eval {
526 $ldb->begin_work;
527
528 $ldb->do("DELETE FROM Rule");
529 $ldb->do("DELETE FROM RuleGroup");
530 $ldb->do("DELETE FROM ObjectGroup");
531 $ldb->do("DELETE FROM Object");
532 $ldb->do("DELETE FROM Attribut");
533
534 eval {
535 $rdb->begin_work;
536
537 # read a consistent snapshot
538 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
539
540 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
541 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
542 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
543 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
544 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
545 };
546
547 $rdb->rollback; # end transaction
548
549 die $@ if $@;
550
551 # update sequences
552
553 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
554 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
555 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
556
557 $ldb->commit;
558 };
559 if (my $err = $@) {
560 $ldb->rollback;
561 die $err;
562 }
563
e29bdb0a
SI
564 PMG::DBTools::reload_ruledb();
565
db303db4
DM
566 syslog('info', "finished rule database sync from host '$ni->{ip}'");
567}
568
9430b6d4
DM
569sub sync_quarantine_db {
570 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
571
572 my $rcid = $ni->{cid};
573
574 my $maxmails = 100000;
575
576 my $mscount = 0;
577
578 my $ctime = PMG::DBTools::get_remote_time($rdb);
579
580 my $maxcount = 1000;
581
582 my $count;
583
584 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
585
586 do { # get new values
587
588 $count = 0;
589
590 my $flistname = "/tmp/quarantinefilelist.$$";
591
592 eval {
593 $ldb->begin_work;
594
595 open(my $flistfh, '>', $flistname) ||
596 die "unable to open file '$flistname' - $!\n";
597
598 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
599
600 # sync CMailStore
601
602 my $sth = $rdb->prepare(
603 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
604 "ORDER BY cid,rid LIMIT ?");
605 $sth->execute($rcid, $lastid, $maxcount);
606
607 my $maxid;
608 my $callback = sub {
609 my $ref = shift;
610 $maxid = $ref->{rid};
89eefd36
DM
611 my $filename = $ref->{file};
612 # skip files generated before cluster was created
613 return if $filename !~ m!^cluster/!;
614 print $flistfh "$filename\n";
9430b6d4
DM
615 };
616
617 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
618 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
619
620 close($flistfh);
621
622 my $starttime = [ gettimeofday() ];
89eefd36 623 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
9430b6d4
DM
624 $$rsynctime_ref += tv_interval($starttime);
625
626 if ($maxid) {
627 # sync CMSReceivers
628
629 $sth = $rdb->prepare(
630 "SELECT * from CMSReceivers WHERE " .
631 "CMailStore_CID = ? AND CMailStore_RID > ? " .
632 "AND CMailStore_RID <= ?");
633 $sth->execute($rcid, $lastid, $maxid);
634
666b5e8f 635 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
9430b6d4
DM
636 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
637
638 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
639 }
640
641 $ldb->commit;
642 };
643 my $err = $@;
644
645 unlink $flistname;
646
647 if ($err) {
648 $ldb->rollback;
649 die $err;
650 }
651
652 $mscount += $count;
653
d4c2f646 654 } while (($count >= $maxcount) && ($mscount < $maxmails));
9430b6d4
DM
655
656 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
657
658 eval { # synchronize status updates
659 $ldb->begin_work;
660
661 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
662
663 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
664 $sth->execute($lastmt);
665
666 my $update_sth = $ldb->prepare(
667 "UPDATE CMSReceivers SET status = ? WHERE " .
666b5e8f 668 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
9430b6d4
DM
669 while (my $ref = $sth->fetchrow_hashref()) {
670 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
666b5e8f 671 $ref->{cmailstore_rid}, $ref->{ticketid});
9430b6d4
DM
672 }
673
674 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
675
676 $ldb->commit;
677 };
678 if (my $err = $@) {
679 $ldb->rollback;
680 die $err;
681 }
682
683 return $mscount;
684}
685
686sub sync_statistic_db {
687 my ($ldb, $rdb, $ni) = @_;
688
689 my $rcid = $ni->{cid};
690
691 my $maxmails = 100000;
692
693 my $mscount = 0;
694
695 my $maxcount = 1000;
696
697 my $count;
698
699 PMG::DBTools::create_clusterinfo_default(
700 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
701
702 do { # get new values
703
704 $count = 0;
705
706 eval {
707 $ldb->begin_work;
708
709 my $lastid = PMG::DBTools::read_int_clusterinfo(
710 $ldb, $rcid, 'lastid_CStatistic');
711
712 # sync CStatistic
713
714 my $sth = $rdb->prepare(
715 "SELECT * from CStatistic " .
716 "WHERE cid = ? AND rid > ? " .
717 "ORDER BY cid, rid LIMIT ?");
718 $sth->execute($rcid, $lastid, $maxcount);
719
720 my $maxid;
721 my $callback = sub {
722 my $ref = shift;
723 $maxid = $ref->{rid};
724 };
725
726 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
727 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
728
729 if ($maxid) {
730 # sync CReceivers
731
732 $sth = $rdb->prepare(
733 "SELECT * from CReceivers WHERE " .
734 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
735 $sth->execute($rcid, $lastid, $maxid);
736
737 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
738 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
9430b6d4
DM
739 }
740
64b7a2ea
DM
741 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
742
9430b6d4
DM
743 $ldb->commit;
744 };
745 if (my $err = $@) {
746 $ldb->rollback;
747 die $err;
748 }
749
750 $mscount += $count;
751
d4c2f646 752 } while (($count >= $maxcount) && ($mscount < $maxmails));
e86b85a3
DM
753
754 return $mscount;
9430b6d4
DM
755}
756
986eec31 757my $sync_generic_mtime_db = sub {
9430b6d4
DM
758 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
759
9430b6d4
DM
760 my $ctime = PMG::DBTools::get_remote_time($rdb);
761
762 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
763
764 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
765
4c93256a 766 my $sql_cmd = $selectfunc->($ctime, $lastmt);
9430b6d4 767
4c93256a 768 my $sth = $rdb->prepare($sql_cmd);
9430b6d4
DM
769
770 $sth->execute();
771
986eec31
DM
772 my $updates = 0;
773
4c93256a
DM
774 eval {
775 # use transaction to speedup things
776 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
4c93256a
DM
777 my $count = 0;
778 while (my $ref = $sth->fetchrow_hashref()) {
9d5bdd2a
DM
779 $ldb->begin_work if !$count;
780 $mergefunc->($ref);
4c93256a
DM
781 if (++$count >= $max) {
782 $count = 0;
783 $ldb->commit;
9430b6d4 784 }
986eec31 785 $updates++;
9430b6d4 786 }
9d5bdd2a
DM
787
788 $ldb->commit if $count;
9430b6d4 789 };
4c93256a
DM
790 if (my $err = $@) {
791 $ldb->rollback;
792 die $err;
9430b6d4
DM
793 }
794
9430b6d4
DM
795 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
796
986eec31
DM
797 return $updates;
798};
9430b6d4 799
5e1408fd
DM
800sub sync_localstat_db {
801 my ($dbh, $rdb, $ni) = @_;
802
803 my $rcid = $ni->{cid};
804
805 my $selectfunc = sub {
806 my ($ctime, $lastmt) = @_;
807 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
808 };
809
810 my $merge_sth = $dbh->prepare(
ebd19c79
DM
811 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
812 'VALUES (?, ?, ?, ?, ?) ' .
5e1408fd 813 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
ebd19c79 814 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
5e1408fd
DM
815
816 my $mergefunc = sub {
817 my ($ref) = @_;
818
ebd19c79 819 $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
5e1408fd
DM
820 };
821
822 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
823}
824
9430b6d4
DM
825sub sync_greylist_db {
826 my ($dbh, $rdb, $ni) = @_;
827
828 my $selectfunc = sub {
829 my ($ctime, $lastmt) = @_;
830 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
831 "mtime >= $lastmt AND CID != 0";
832 };
833
f61d5489 834 my $merge_sth = $dbh->prepare(PMG::DBTools::cgreylist_merge_sql());
9430b6d4 835 my $mergefunc = sub {
986eec31 836 my ($ref) = @_;
9430b6d4 837
f61d5489
SI
838 my $ipnet = $ref->{ipnet};
839 $ipnet .= '.0/24' if $ipnet !~ /\/\d+$/;
f413f920 840 $merge_sth->execute(
4e5d7fd8 841 $ipnet, $ref->{sender}, $ref->{receiver},
f413f920
DM
842 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
843 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
9430b6d4
DM
844 };
845
986eec31 846 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
9430b6d4
DM
847}
848
849sub sync_userprefs_db {
850 my ($dbh, $rdb, $ni) = @_;
851
852 my $selectfunc = sub {
853 my ($ctime, $lastmt) = @_;
854
855 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
856 };
857
7d13157e 858 my $merge_sth = $dbh->prepare(
471b05ed 859 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
286bc590 860 'VALUES (?, ?, ?, ?) ' .
471b05ed 861 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
dbdf1298 862 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
3a9ffd85 863 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
0527ea1a 864 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
471b05ed 865
9430b6d4 866 my $mergefunc = sub {
986eec31 867 my ($ref) = @_;
9430b6d4 868
286bc590 869 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
9430b6d4
DM
870 };
871
986eec31 872 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
9430b6d4
DM
873}
874
875sub sync_domainstat_db {
876 my ($dbh, $rdb, $ni) = @_;
877
878 my $selectfunc = sub {
879 my ($ctime, $lastmt) = @_;
880 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
881 };
882
1c7ea32c
DM
883 my $merge_sth = $dbh->prepare(
884 'INSERT INTO Domainstat ' .
885 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
886 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
887 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
888 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
889 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
890 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
891 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
892 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
893 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
894 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
895
9430b6d4 896 my $mergefunc = sub {
986eec31 897 my ($ref) = @_;
9430b6d4 898
1c7ea32c
DM
899 $merge_sth->execute(
900 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
901 $ref->{bytesin}, $ref->{bytesout},
902 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
903 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
904 };
905
986eec31 906 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
9430b6d4
DM
907}
908
909sub sync_dailystat_db {
910 my ($dbh, $rdb, $ni) = @_;
911
912 my $selectfunc = sub {
913 my ($ctime, $lastmt) = @_;
914 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
8bf584ae
DM
915 };
916
917 my $merge_sth = $dbh->prepare(
918 'INSERT INTO DailyStat ' .
919 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
920 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
921 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
922 'ON CONFLICT (Time) DO UPDATE SET ' .
923 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
924 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
925 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
926 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
927 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
928 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
929 'RBLCount = excluded.RBLCount, ' .
930 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
9430b6d4
DM
931
932 my $mergefunc = sub {
986eec31 933 my ($ref) = @_;
9430b6d4 934
dbdf1298 935 $merge_sth->execute(
8bf584ae
DM
936 $ref->{time}, $ref->{countin}, $ref->{countout},
937 $ref->{bytesin}, $ref->{bytesout},
938 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
939 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
940 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
941 };
942
986eec31 943 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
9430b6d4
DM
944}
945
946sub sync_virusinfo_db {
947 my ($dbh, $rdb, $ni) = @_;
948
949 my $selectfunc = sub {
950 my ($ctime, $lastmt) = @_;
951 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
952 };
953
80615ad6
DM
954 my $merge_sth = $dbh->prepare(
955 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
956 'VALUES (?,?,?,?) ' .
957 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
958 'Count = excluded.Count , MTime = excluded.MTime');
959
9430b6d4 960 my $mergefunc = sub {
986eec31 961 my ($ref) = @_;
9430b6d4 962
80615ad6 963 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
9430b6d4
DM
964 };
965
986eec31 966 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
9430b6d4
DM
967}
968
969sub sync_deleted_nodes_from_master {
970 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
971
972 my $rsynctime = 0;
973
974 my $cid_hash = {}; # fast lookup
5c6e6eeb 975 foreach my $ni (values %{$cinfo->{ids}}) {
9430b6d4
DM
976 $cid_hash->{$ni->{cid}} = $ni;
977 }
978
979 my $spooldir = $PMG::MailQueue::spooldir;
980
5c6e6eeb
DM
981 my $maxcid = $cinfo->{master}->{maxcid} // 0;
982
983 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
9430b6d4
DM
984 next if $cid_hash->{$rcid};
985
986 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
987
988 next if -f $done_marker; # already synced
989
990 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
991
992 my $starttime = [ gettimeofday() ];
993 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
994 $$rsynctime_ref += tv_interval($starttime);
995
996 my $fake_ni = {
997 ip => $masterni->{ip},
998 name => $masterni->{name},
999 cid => $rcid,
1000 };
1001
1002 sync_quarantine_db($ldb, $masterdb, $fake_ni);
1003
1004 sync_statistic_db ($ldb, $masterdb, $fake_ni);
1005
1006 open(my $fh, ">>", $done_marker);
1007 }
1008}
1009
1010
0854fb22 10111;