]> git.proxmox.com Git - pmg-api.git/blame - src/PMG/Cluster.pm
greylisting: drop unneeded Host column form cgreylist table
[pmg-api.git] / src / PMG / Cluster.pm
CommitLineData
0854fb22
DM
1package PMG::Cluster;
2
3use strict;
4use warnings;
45e68618 5use Data::Dumper;
0854fb22 6use Socket;
cfdf6608 7use File::Path;
9430b6d4 8use Time::HiRes qw (gettimeofday tv_interval);
45e68618 9
a7c7cad7 10use PVE::SafeSyslog;
0854fb22
DM
11use PVE::Tools;
12use PVE::INotify;
0757859a 13use PVE::APIClient::LWP;
b0d26b8f 14use PVE::Network;
0854fb22 15
1fb2ab76 16use PMG::Utils;
a7c7cad7 17use PMG::Config;
9f67f5b3 18use PMG::ClusterConfig;
db303db4
DM
19use PMG::RuleDB;
20use PMG::RuleCache;
9430b6d4 21use PMG::MailQueue;
0757859a 22use PMG::Fetchmail;
0854fb22 23
8737f93a
DM
24sub remote_node_ip {
25 my ($nodename, $noerr) = @_;
26
d8782874 27 my $cinfo = PMG::ClusterConfig->new();
8737f93a 28
45e68618 29 foreach my $entry (values %{$cinfo->{ids}}) {
8737f93a
DM
30 if ($entry->{name} eq $nodename) {
31 my $ip = $entry->{ip};
32 return $ip if !wantarray;
33 my $family = PVE::Tools::get_host_address_family($ip);
34 return ($ip, $family);
35 }
36 }
37
38 # fallback: try to get IP by other means
b0d26b8f 39 return PVE::Network::get_ip_from_hostname($nodename, $noerr);
8737f93a
DM
40}
41
d2e43f9e
DM
42sub get_master_node {
43 my ($cinfo) = @_;
44
d8782874 45 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
d2e43f9e
DM
46
47 return $cinfo->{master}->{name} if defined($cinfo->{master});
48
49 return 'localhost';
50}
51
cba17aeb
DM
52sub read_local_ssl_cert_fingerprint {
53 my $cert_path = "/etc/pmg/pmg-api.pem";
0854fb22 54
cba17aeb
DM
55 my $cert;
56 eval {
57 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
58 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
59 Net::SSLeay::BIO_free($bio);
60 };
61 if (my $err = $@) {
62 die "unable to read certificate '$cert_path' - $err\n";
63 }
0854fb22 64
cba17aeb
DM
65 if (!defined($cert)) {
66 die "unable to read certificate '$cert_path' - got empty value\n";
67 }
68
69 my $fp;
70 eval {
71 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
72 };
73 if (my $err = $@) {
74 die "unable to get fingerprint for '$cert_path' - $err\n";
75 }
0854fb22 76
cba17aeb
DM
77 if (!defined($fp) || $fp eq '') {
78 die "unable to get fingerprint for '$cert_path' - got empty value\n";
79 }
0854fb22 80
cba17aeb
DM
81 return $fp;
82}
0854fb22 83
cba17aeb
DM
84my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
85my $rootrsakey_fn = '/root/.ssh/id_rsa';
86my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
0854fb22 87
cba17aeb
DM
88sub read_local_cluster_info {
89
90 my $res = {};
91
92 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
93 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
94 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
95
96 die "unable to parse ${hostrsapubkey_fn}\n"
97 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
0854fb22
DM
98
99 my $nodename = PVE::INotify::nodename();
100
cba17aeb 101 $res->{name} = $nodename;
0854fb22 102
b0d26b8f 103 $res->{ip} = PVE::Network::get_ip_from_hostname($nodename);
0854fb22 104
cba17aeb 105 $res->{hostrsapubkey} = $hostrsapubkey;
0854fb22 106
cba17aeb
DM
107 if (! -f $rootrsapubkey_fn) {
108 unlink $rootrsakey_fn;
109 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
110 '-f', $rootrsakey_fn];
1fb2ab76 111 PMG::Utils::run_silent_cmd($cmd);
cba17aeb
DM
112 }
113
114 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
115 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
116 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
117
118 die "unable to parse ${rootrsapubkey_fn}\n"
119 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
120
121 $res->{rootrsapubkey} = $rootrsapubkey;
122
123 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
124
125 return $res;
126}
127
128# X509 Certificate cache helper
129
130my $cert_cache_nodes = {};
131my $cert_cache_timestamp = time();
132my $cert_cache_fingerprints = {};
0854fb22 133
cba17aeb 134sub update_cert_cache {
0854fb22 135
cba17aeb
DM
136 $cert_cache_timestamp = time();
137
138 $cert_cache_fingerprints = {};
139 $cert_cache_nodes = {};
140
d8782874 141 my $cinfo = PMG::ClusterConfig->new();
cba17aeb
DM
142
143 foreach my $entry (values %{$cinfo->{ids}}) {
144 my $node = $entry->{name};
145 my $fp = $entry->{fingerprint};
146 if ($node && $fp) {
147 $cert_cache_fingerprints->{$fp} = 1;
148 $cert_cache_nodes->{$node} = $fp;
0854fb22
DM
149 }
150 }
151}
152
153# load and cache cert fingerprint once
154sub initialize_cert_cache {
155 my ($node) = @_;
156
cba17aeb 157 update_cert_cache()
0854fb22
DM
158 if defined($node) && !defined($cert_cache_nodes->{$node});
159}
160
161sub check_cert_fingerprint {
162 my ($cert) = @_;
163
164 # clear cache every 30 minutes at least
cba17aeb 165 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
0854fb22
DM
166
167 # get fingerprint of server certificate
168 my $fp;
169 eval {
170 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
171 };
172 return 0 if $@ || !defined($fp) || $fp eq ''; # error
173
174 my $check = sub {
175 for my $expected (keys %$cert_cache_fingerprints) {
176 return 1 if $fp eq $expected;
177 }
178 return 0;
179 };
180
cba17aeb 181 return 1 if $check->();
0854fb22
DM
182
183 # clear cache and retry at most once every minute
184 if (time() - $cert_cache_timestamp >= 60) {
185 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
186 update_cert_cache();
cba17aeb 187 return $check->();
0854fb22
DM
188 }
189
190 return 0;
191}
192
58072364
DM
193my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
194my $rootsshauthkeys = "/root/.ssh/authorized_keys";
195my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
196
197sub update_ssh_keys {
198 my ($cinfo) = @_;
199
809dd92a 200 my $old = '';
58072364 201 my $data = '';
809dd92a 202
58072364
DM
203 foreach my $node (values %{$cinfo->{ids}}) {
204 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
205 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
206 }
207
809dd92a
DM
208 $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
209 if -f $sshglobalknownhosts;
210
211 PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
212 if $old ne $data;
58072364
DM
213
214 $data = '';
809dd92a 215 $old = '';
58072364
DM
216
217 # always add ourself
218 if (-f $ssh_rsa_id) {
219 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
220 chomp($pub);
221 $data .= "$pub\n";
222 }
223
224 foreach my $node (values %{$cinfo->{ids}}) {
225 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
226 }
227
228 if (-f $rootsshauthkeys) {
a7c7cad7
DM
229 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
230 chomp($mykey);
231 $data .= "$mykey\n";
58072364
DM
232 }
233
234 my $newdata = "";
235 my $vhash = {};
236 my @lines = split(/\n/, $data);
237 foreach my $line (@lines) {
238 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
239 next if $vhash->{$3}++;
240 }
241 $newdata .= "$line\n";
242 }
243
809dd92a
DM
244 $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
245 if -f $rootsshauthkeys;
246
247 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
248 if $old ne $newdata;
58072364
DM
249}
250
a7c7cad7
DM
251my $cfgdir = '/etc/pmg';
252my $syncdir = "$cfgdir/master";
253
254my $cond_commit_synced_file = sub {
255 my ($filename, $dstfn) = @_;
256
257 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
258 my $srcfn = "$syncdir/$filename";
259
260 if (! -f $srcfn) {
261 unlink $dstfn;
262 return;
263 }
264
265 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
266
267 if (-f $dstfn) {
268 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
269 return 0 if $new eq $old;
270 }
271
1dc946d4
DM
272 # set mtime (touch) to avoid time drift problems
273 utime(undef, undef, $srcfn);
274
a7c7cad7
DM
275 rename($srcfn, $dstfn) ||
276 die "cond_rename_file '$filename' failed - $!\n";
277
278 print STDERR "updated $dstfn\n";
279
280 return 1;
281};
282
8b455bef
SI
283my $ssh_command = sub {
284 my ($host_key_alias, @args) = @_;
285
286 my $cmd = ['ssh', '-l', 'root', '-o', 'BatchMode=yes'];
287 push @$cmd, '-o', "HostKeyAlias=${host_key_alias}" if $host_key_alias;
288 push @$cmd, @args if @args;
289 return $cmd;
290};
291
2930bbb2
SI
292sub get_remote_cert_fingerprint {
293 my ($ni) = @_;
294
295 my $ssh_cmd = $ssh_command->(
f9ee6324
TL
296 $ni->{name},
297 $ni->{ip},
298 'openssl x509 -noout -fingerprint -sha256 -in /etc/pmg/pmg-api.pem'
299 );
2930bbb2
SI
300 my $fp;
301 eval {
302 PVE::Tools::run_command($ssh_cmd, outfunc => sub {
303 my ($line) = @_;
304 if ($line =~ m/SHA256 Fingerprint=((?:[A-Fa-f0-9]{2}:){31}[A-Fa-f0-9]{2})/) {
305 $fp = $1;
306 }
307 });
308 die "parsing failed\n" if !$fp;
309 };
310 die "unable to get remote node fingerprint from '$ni->{name}': $@\n" if $@;
311
312 return $fp;
313}
314
b61378e1
SI
315sub trigger_update_fingerprints {
316 my ($cinfo) = @_;
317
318 my $master = $cinfo->{master} || die "unable to lookup master node\n";
0b0c58e6 319 my $cached_fp = { $master->{fingerprint} => 1 };
b61378e1
SI
320
321 # if running on master the current fingerprint for the API-connection is needed
0b0c58e6 322 # in addition (to prevent races with restarting pmgproxy
b61378e1 323 if ($cinfo->{local}->{type} eq 'master') {
0b0c58e6
SI
324 my $new_fp = PMG::Cluster::read_local_ssl_cert_fingerprint();
325 $cached_fp->{$new_fp} = 1;
b61378e1
SI
326 }
327
328 my $ticket = PMG::Ticket::assemble_ticket('root@pam');
329 my $csrftoken = PMG::Ticket::assemble_csrf_prevention_token('root@pam');
330 my $conn = PVE::APIClient::LWP->new(
331 ticket => $ticket,
332 csrftoken => $csrftoken,
333 cookie_name => 'PMGAuthCookie',
334 host => $master->{ip},
0b0c58e6
SI
335 cached_fingerprints => $cached_fp,
336 );
b61378e1
SI
337
338 $conn->post("/config/cluster/update-fingerprints", {});
339 return undef;
340}
341
c9dae0df
DM
342my $rsync_command = sub {
343 my ($host_key_alias, @args) = @_;
344
8b455bef 345 my $ssh_cmd = join(' ', @{$ssh_command->($host_key_alias)});
c9dae0df 346
8b455bef 347 my $cmd = ['rsync', "--rsh=$ssh_cmd", '-q', @args];
c9dae0df
DM
348
349 return $cmd;
350};
351
352sub sync_quarantine_files {
89eefd36 353 my ($host_ip, $host_name, $flistname, $rcid) = @_;
c9dae0df 354
9430b6d4
DM
355 my $spooldir = $PMG::MailQueue::spooldir;
356
89eefd36
DM
357 mkdir "$spooldir/cluster/";
358 my $syncdir = "$spooldir/cluster/$rcid";
359 mkdir $syncdir;
360
c9dae0df 361 my $cmd = $rsync_command->(
4daf3a35 362 $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
c9dae0df
DM
363 '--files-from', $flistname);
364
9430b6d4 365 PVE::Tools::run_command($cmd);
c9dae0df
DM
366}
367
368sub sync_spooldir {
369 my ($host_ip, $host_name, $rcid) = @_;
370
9430b6d4
DM
371 my $spooldir = $PMG::MailQueue::spooldir;
372
c9dae0df
DM
373 mkdir "$spooldir/cluster/";
374 my $syncdir = "$spooldir/cluster/$rcid";
375 mkdir $syncdir;
376
377 my $cmd = $rsync_command->(
4daf3a35 378 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
c9dae0df
DM
379
380 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
381 push @$cmd, '--include', $incl;
382 }
383
384 push @$cmd, '--exclude', '*';
385
386 PVE::Tools::run_command($cmd);
387}
388
389sub sync_master_quar {
390 my ($host_ip, $host_name) = @_;
391
9430b6d4
DM
392 my $spooldir = $PMG::MailQueue::spooldir;
393
c9dae0df
DM
394 my $syncdir = "$spooldir/cluster/";
395 mkdir $syncdir;
396
397 my $cmd = $rsync_command->(
4daf3a35 398 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
c9dae0df
DM
399
400 PVE::Tools::run_command($cmd);
401}
402
58072364 403sub sync_config_from_master {
809ae8f4 404 my ($master_name, $master_ip, $noreload) = @_;
58072364 405
58072364 406 mkdir $syncdir;
a7c7cad7 407 File::Path::remove_tree($syncdir, {keep_root => 1});
58072364 408
a7c7cad7
DM
409 my $sa_conf_dir = "/etc/mail/spamassassin";
410 my $sa_custom_cf = "custom.cf";
957d0882 411 my $sa_rules_cf = "pmg-scores.cf";
58072364 412
c9dae0df 413 my $cmd = $rsync_command->(
f4b7112b 414 $master_name, '-aq',
957d0882 415 "[${master_ip}]:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf} ${sa_conf_dir}/${sa_rules_cf}",
c9dae0df 416 "$syncdir/",
f4b7112b 417 '--exclude', 'master/',
c9dae0df
DM
418 '--exclude', '*~',
419 '--exclude', '*.db',
420 '--exclude', 'pmg-api.pem',
421 '--exclude', 'pmg-tls.pem',
422 );
58072364
DM
423
424 my $errmsg = "syncing master configuration from '${master_ip}' failed";
425 PVE::Tools::run_command($cmd, errmsg => $errmsg);
a7c7cad7
DM
426
427 # verify that the remote host is cluster master
428 open (my $fh, '<', "$syncdir/cluster.conf") ||
429 die "unable to open synced cluster.conf - $!\n";
a7c7cad7 430
809ae8f4
DM
431 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
432
433 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
a7c7cad7
DM
434 die "host '$master_ip' is not cluster master\n";
435 }
436
809ae8f4
DM
437 my $role = $cinfo->{'local'}->{type} // '-';
438 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
a7c7cad7
DM
439 if $role eq '-';
440
809ae8f4 441 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
a7c7cad7
DM
442 if $role eq 'master';
443
a7c7cad7 444 $cond_commit_synced_file->('cluster.conf');
a7c7cad7 445
809dd92a
DM
446 update_ssh_keys($cinfo); # rewrite ssh keys
447
0757859a
DM
448 PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
449
a7c7cad7
DM
450 my $files = [
451 'pmg-authkey.key',
452 'pmg-authkey.pub',
453 'pmg-csrf.key',
454 'ldap.conf',
455 'user.conf',
7cac3e28
DM
456 'domains',
457 'mynetworks',
458 'transport',
959aaeba 459 'tls_policy',
fd6feef4 460 'fetchmailrc',
a7c7cad7
DM
461 ];
462
463 foreach my $filename (@$files) {
464 $cond_commit_synced_file->($filename);
465 }
466
06cebf83
SI
467 my $dirs = [
468 'templates',
469 'dkim',
2cfdd9a1 470 'pbs',
6a477857 471 'acme',
06cebf83
SI
472 ];
473
474 foreach my $dir (@$dirs) {
475 my $srcdir = "$syncdir/$dir";
476
477 if ( -d $srcdir ) {
478 my $cmd = ['rsync', '-aq', '--delete-after', "$srcdir/", "$cfgdir/$dir"];
479 PVE::Tools::run_command($cmd);
480 }
481
482 }
0757859a 483
a7c7cad7
DM
484 my $force_restart = {};
485
957d0882
DC
486 for my $file (($sa_custom_cf, $sa_rules_cf)) {
487 if ($cond_commit_synced_file->($file, "${sa_conf_dir}/${file}")) {
488 $force_restart->{'pmg-smtp-filter'} = 1;
489 }
a7c7cad7
DM
490 }
491
492 $cond_commit_synced_file->('pmg.conf');
f4b7112b 493
5314a3a9 494 return $force_restart;
58072364
DM
495}
496
db303db4
DM
497sub sync_ruledb_from_master {
498 my ($ldb, $rdb, $ni, $ticket) = @_;
499
500 my $ruledb = PMG::RuleDB->new($ldb);
501 my $rulecache = PMG::RuleCache->new($ruledb);
502
503 my $conn = PVE::APIClient::LWP->new(
504 ticket => $ticket,
505 cookie_name => 'PMGAuthCookie',
506 host => $ni->{ip},
507 cached_fingerprints => {
508 $ni->{fingerprint} => 1,
509 });
510
511 my $digest = $conn->get("/config/ruledb/digest", {});
512
513 return if $digest eq $rulecache->{digest}; # no changes
514
515 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
516
517 eval {
518 $ldb->begin_work;
519
520 $ldb->do("DELETE FROM Rule");
521 $ldb->do("DELETE FROM RuleGroup");
522 $ldb->do("DELETE FROM ObjectGroup");
523 $ldb->do("DELETE FROM Object");
524 $ldb->do("DELETE FROM Attribut");
525
526 eval {
527 $rdb->begin_work;
528
529 # read a consistent snapshot
530 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
531
532 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
533 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
534 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
535 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
536 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
537 };
538
539 $rdb->rollback; # end transaction
540
541 die $@ if $@;
542
543 # update sequences
544
545 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
546 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
547 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
548
549 $ldb->commit;
550 };
551 if (my $err = $@) {
552 $ldb->rollback;
553 die $err;
554 }
555
e29bdb0a
SI
556 PMG::DBTools::reload_ruledb();
557
db303db4
DM
558 syslog('info', "finished rule database sync from host '$ni->{ip}'");
559}
560
9430b6d4
DM
561sub sync_quarantine_db {
562 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
563
564 my $rcid = $ni->{cid};
565
566 my $maxmails = 100000;
567
568 my $mscount = 0;
569
570 my $ctime = PMG::DBTools::get_remote_time($rdb);
571
572 my $maxcount = 1000;
573
574 my $count;
575
576 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
577
578 do { # get new values
579
580 $count = 0;
581
582 my $flistname = "/tmp/quarantinefilelist.$$";
583
584 eval {
585 $ldb->begin_work;
586
587 open(my $flistfh, '>', $flistname) ||
588 die "unable to open file '$flistname' - $!\n";
589
590 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
591
592 # sync CMailStore
593
594 my $sth = $rdb->prepare(
595 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
596 "ORDER BY cid,rid LIMIT ?");
597 $sth->execute($rcid, $lastid, $maxcount);
598
599 my $maxid;
600 my $callback = sub {
601 my $ref = shift;
602 $maxid = $ref->{rid};
89eefd36
DM
603 my $filename = $ref->{file};
604 # skip files generated before cluster was created
605 return if $filename !~ m!^cluster/!;
606 print $flistfh "$filename\n";
9430b6d4
DM
607 };
608
609 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
610 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
611
612 close($flistfh);
613
614 my $starttime = [ gettimeofday() ];
89eefd36 615 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
9430b6d4
DM
616 $$rsynctime_ref += tv_interval($starttime);
617
618 if ($maxid) {
619 # sync CMSReceivers
620
621 $sth = $rdb->prepare(
622 "SELECT * from CMSReceivers WHERE " .
623 "CMailStore_CID = ? AND CMailStore_RID > ? " .
624 "AND CMailStore_RID <= ?");
625 $sth->execute($rcid, $lastid, $maxid);
626
666b5e8f 627 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
9430b6d4
DM
628 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
629
630 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
631 }
632
633 $ldb->commit;
634 };
635 my $err = $@;
636
637 unlink $flistname;
638
639 if ($err) {
640 $ldb->rollback;
641 die $err;
642 }
643
644 $mscount += $count;
645
d4c2f646 646 } while (($count >= $maxcount) && ($mscount < $maxmails));
9430b6d4
DM
647
648 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
649
650 eval { # synchronize status updates
651 $ldb->begin_work;
652
653 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
654
655 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
656 $sth->execute($lastmt);
657
658 my $update_sth = $ldb->prepare(
659 "UPDATE CMSReceivers SET status = ? WHERE " .
666b5e8f 660 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
9430b6d4
DM
661 while (my $ref = $sth->fetchrow_hashref()) {
662 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
666b5e8f 663 $ref->{cmailstore_rid}, $ref->{ticketid});
9430b6d4
DM
664 }
665
666 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
667
668 $ldb->commit;
669 };
670 if (my $err = $@) {
671 $ldb->rollback;
672 die $err;
673 }
674
675 return $mscount;
676}
677
678sub sync_statistic_db {
679 my ($ldb, $rdb, $ni) = @_;
680
681 my $rcid = $ni->{cid};
682
683 my $maxmails = 100000;
684
685 my $mscount = 0;
686
687 my $maxcount = 1000;
688
689 my $count;
690
691 PMG::DBTools::create_clusterinfo_default(
692 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
693
694 do { # get new values
695
696 $count = 0;
697
698 eval {
699 $ldb->begin_work;
700
701 my $lastid = PMG::DBTools::read_int_clusterinfo(
702 $ldb, $rcid, 'lastid_CStatistic');
703
704 # sync CStatistic
705
706 my $sth = $rdb->prepare(
707 "SELECT * from CStatistic " .
708 "WHERE cid = ? AND rid > ? " .
709 "ORDER BY cid, rid LIMIT ?");
710 $sth->execute($rcid, $lastid, $maxcount);
711
712 my $maxid;
713 my $callback = sub {
714 my $ref = shift;
715 $maxid = $ref->{rid};
716 };
717
718 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
719 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
720
721 if ($maxid) {
722 # sync CReceivers
723
724 $sth = $rdb->prepare(
725 "SELECT * from CReceivers WHERE " .
726 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
727 $sth->execute($rcid, $lastid, $maxid);
728
729 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
730 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
9430b6d4
DM
731 }
732
64b7a2ea
DM
733 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
734
9430b6d4
DM
735 $ldb->commit;
736 };
737 if (my $err = $@) {
738 $ldb->rollback;
739 die $err;
740 }
741
742 $mscount += $count;
743
d4c2f646 744 } while (($count >= $maxcount) && ($mscount < $maxmails));
e86b85a3
DM
745
746 return $mscount;
9430b6d4
DM
747}
748
986eec31 749my $sync_generic_mtime_db = sub {
9430b6d4
DM
750 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
751
9430b6d4
DM
752 my $ctime = PMG::DBTools::get_remote_time($rdb);
753
754 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
755
756 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
757
4c93256a 758 my $sql_cmd = $selectfunc->($ctime, $lastmt);
9430b6d4 759
4c93256a 760 my $sth = $rdb->prepare($sql_cmd);
9430b6d4
DM
761
762 $sth->execute();
763
986eec31
DM
764 my $updates = 0;
765
4c93256a
DM
766 eval {
767 # use transaction to speedup things
768 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
4c93256a
DM
769 my $count = 0;
770 while (my $ref = $sth->fetchrow_hashref()) {
9d5bdd2a
DM
771 $ldb->begin_work if !$count;
772 $mergefunc->($ref);
4c93256a
DM
773 if (++$count >= $max) {
774 $count = 0;
775 $ldb->commit;
9430b6d4 776 }
986eec31 777 $updates++;
9430b6d4 778 }
9d5bdd2a
DM
779
780 $ldb->commit if $count;
9430b6d4 781 };
4c93256a
DM
782 if (my $err = $@) {
783 $ldb->rollback;
784 die $err;
9430b6d4
DM
785 }
786
9430b6d4
DM
787 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
788
986eec31
DM
789 return $updates;
790};
9430b6d4 791
5e1408fd
DM
792sub sync_localstat_db {
793 my ($dbh, $rdb, $ni) = @_;
794
795 my $rcid = $ni->{cid};
796
797 my $selectfunc = sub {
798 my ($ctime, $lastmt) = @_;
799 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
800 };
801
802 my $merge_sth = $dbh->prepare(
ebd19c79
DM
803 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
804 'VALUES (?, ?, ?, ?, ?) ' .
5e1408fd 805 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
ebd19c79 806 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
5e1408fd
DM
807
808 my $mergefunc = sub {
809 my ($ref) = @_;
810
ebd19c79 811 $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
5e1408fd
DM
812 };
813
814 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
815}
816
9430b6d4
DM
817sub sync_greylist_db {
818 my ($dbh, $rdb, $ni) = @_;
819
820 my $selectfunc = sub {
821 my ($ctime, $lastmt) = @_;
822 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
823 "mtime >= $lastmt AND CID != 0";
824 };
825
f61d5489 826 my $merge_sth = $dbh->prepare(PMG::DBTools::cgreylist_merge_sql());
9430b6d4 827 my $mergefunc = sub {
986eec31 828 my ($ref) = @_;
9430b6d4 829
f61d5489
SI
830 my $ipnet = $ref->{ipnet};
831 $ipnet .= '.0/24' if $ipnet !~ /\/\d+$/;
f413f920 832 $merge_sth->execute(
4e5d7fd8 833 $ipnet, $ref->{sender}, $ref->{receiver},
f413f920
DM
834 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
835 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
9430b6d4
DM
836 };
837
986eec31 838 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
9430b6d4
DM
839}
840
841sub sync_userprefs_db {
842 my ($dbh, $rdb, $ni) = @_;
843
844 my $selectfunc = sub {
845 my ($ctime, $lastmt) = @_;
846
847 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
848 };
849
7d13157e 850 my $merge_sth = $dbh->prepare(
471b05ed 851 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
286bc590 852 'VALUES (?, ?, ?, ?) ' .
471b05ed 853 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
dbdf1298 854 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
3a9ffd85 855 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
0527ea1a 856 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
471b05ed 857
9430b6d4 858 my $mergefunc = sub {
986eec31 859 my ($ref) = @_;
9430b6d4 860
286bc590 861 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
9430b6d4
DM
862 };
863
986eec31 864 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
9430b6d4
DM
865}
866
867sub sync_domainstat_db {
868 my ($dbh, $rdb, $ni) = @_;
869
870 my $selectfunc = sub {
871 my ($ctime, $lastmt) = @_;
872 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
873 };
874
1c7ea32c
DM
875 my $merge_sth = $dbh->prepare(
876 'INSERT INTO Domainstat ' .
877 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
878 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
879 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
880 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
881 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
882 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
883 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
884 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
885 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
886 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
887
9430b6d4 888 my $mergefunc = sub {
986eec31 889 my ($ref) = @_;
9430b6d4 890
1c7ea32c
DM
891 $merge_sth->execute(
892 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
893 $ref->{bytesin}, $ref->{bytesout},
894 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
895 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
896 };
897
986eec31 898 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
9430b6d4
DM
899}
900
901sub sync_dailystat_db {
902 my ($dbh, $rdb, $ni) = @_;
903
904 my $selectfunc = sub {
905 my ($ctime, $lastmt) = @_;
906 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
8bf584ae
DM
907 };
908
909 my $merge_sth = $dbh->prepare(
910 'INSERT INTO DailyStat ' .
911 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
912 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
913 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
914 'ON CONFLICT (Time) DO UPDATE SET ' .
915 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
916 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
917 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
918 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
919 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
920 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
921 'RBLCount = excluded.RBLCount, ' .
922 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
9430b6d4
DM
923
924 my $mergefunc = sub {
986eec31 925 my ($ref) = @_;
9430b6d4 926
dbdf1298 927 $merge_sth->execute(
8bf584ae
DM
928 $ref->{time}, $ref->{countin}, $ref->{countout},
929 $ref->{bytesin}, $ref->{bytesout},
930 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
931 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
932 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
933 };
934
986eec31 935 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
9430b6d4
DM
936}
937
938sub sync_virusinfo_db {
939 my ($dbh, $rdb, $ni) = @_;
940
941 my $selectfunc = sub {
942 my ($ctime, $lastmt) = @_;
943 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
944 };
945
80615ad6
DM
946 my $merge_sth = $dbh->prepare(
947 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
948 'VALUES (?,?,?,?) ' .
949 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
950 'Count = excluded.Count , MTime = excluded.MTime');
951
9430b6d4 952 my $mergefunc = sub {
986eec31 953 my ($ref) = @_;
9430b6d4 954
80615ad6 955 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
9430b6d4
DM
956 };
957
986eec31 958 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
9430b6d4
DM
959}
960
961sub sync_deleted_nodes_from_master {
962 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
963
964 my $rsynctime = 0;
965
966 my $cid_hash = {}; # fast lookup
5c6e6eeb 967 foreach my $ni (values %{$cinfo->{ids}}) {
9430b6d4
DM
968 $cid_hash->{$ni->{cid}} = $ni;
969 }
970
971 my $spooldir = $PMG::MailQueue::spooldir;
972
5c6e6eeb
DM
973 my $maxcid = $cinfo->{master}->{maxcid} // 0;
974
975 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
9430b6d4
DM
976 next if $cid_hash->{$rcid};
977
978 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
979
980 next if -f $done_marker; # already synced
981
982 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
983
984 my $starttime = [ gettimeofday() ];
985 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
986 $$rsynctime_ref += tv_interval($starttime);
987
988 my $fake_ni = {
989 ip => $masterni->{ip},
990 name => $masterni->{name},
991 cid => $rcid,
992 };
993
994 sync_quarantine_db($ldb, $masterdb, $fake_ni);
995
996 sync_statistic_db ($ldb, $masterdb, $fake_ni);
997
998 open(my $fh, ">>", $done_marker);
999 }
1000}
1001
1002
0854fb22 10031;