]> git.proxmox.com Git - pmg-api.git/blame - src/PMG/Cluster.pm
cluster: refactor ssh pubkey verification
[pmg-api.git] / src / PMG / Cluster.pm
CommitLineData
0854fb22
DM
1package PMG::Cluster;
2
3use strict;
4use warnings;
45e68618 5use Data::Dumper;
0854fb22 6use Socket;
cfdf6608 7use File::Path;
9430b6d4 8use Time::HiRes qw (gettimeofday tv_interval);
45e68618 9
a7c7cad7 10use PVE::SafeSyslog;
0854fb22
DM
11use PVE::Tools;
12use PVE::INotify;
0757859a 13use PVE::APIClient::LWP;
b0d26b8f 14use PVE::Network;
0854fb22 15
1fb2ab76 16use PMG::Utils;
a7c7cad7 17use PMG::Config;
9f67f5b3 18use PMG::ClusterConfig;
db303db4
DM
19use PMG::RuleDB;
20use PMG::RuleCache;
9430b6d4 21use PMG::MailQueue;
0757859a 22use PMG::Fetchmail;
c56a7321 23use PMG::Ticket;
0854fb22 24
8737f93a
DM
25sub remote_node_ip {
26 my ($nodename, $noerr) = @_;
27
d8782874 28 my $cinfo = PMG::ClusterConfig->new();
8737f93a 29
45e68618 30 foreach my $entry (values %{$cinfo->{ids}}) {
8737f93a
DM
31 if ($entry->{name} eq $nodename) {
32 my $ip = $entry->{ip};
33 return $ip if !wantarray;
34 my $family = PVE::Tools::get_host_address_family($ip);
35 return ($ip, $family);
36 }
37 }
38
39 # fallback: try to get IP by other means
b0d26b8f 40 return PVE::Network::get_ip_from_hostname($nodename, $noerr);
8737f93a
DM
41}
42
d2e43f9e
DM
43sub get_master_node {
44 my ($cinfo) = @_;
45
d8782874 46 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
d2e43f9e
DM
47
48 return $cinfo->{master}->{name} if defined($cinfo->{master});
49
50 return 'localhost';
51}
52
cba17aeb
DM
53sub read_local_ssl_cert_fingerprint {
54 my $cert_path = "/etc/pmg/pmg-api.pem";
0854fb22 55
cba17aeb
DM
56 my $cert;
57 eval {
58 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
59 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
60 Net::SSLeay::BIO_free($bio);
61 };
62 if (my $err = $@) {
63 die "unable to read certificate '$cert_path' - $err\n";
64 }
0854fb22 65
cba17aeb
DM
66 if (!defined($cert)) {
67 die "unable to read certificate '$cert_path' - got empty value\n";
68 }
69
70 my $fp;
71 eval {
72 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
73 };
74 if (my $err = $@) {
75 die "unable to get fingerprint for '$cert_path' - $err\n";
76 }
0854fb22 77
cba17aeb
DM
78 if (!defined($fp) || $fp eq '') {
79 die "unable to get fingerprint for '$cert_path' - got empty value\n";
80 }
0854fb22 81
cba17aeb
DM
82 return $fp;
83}
0854fb22 84
cba17aeb
DM
85my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
86my $rootrsakey_fn = '/root/.ssh/id_rsa';
87my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
0854fb22 88
cba17aeb
DM
89sub read_local_cluster_info {
90
91 my $res = {};
92
93 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
94 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
95 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
96
2a5ed7c1 97 my $sshpubkeypattern = PMG::ClusterConfig::Node::valid_ssh_pubkey();
cba17aeb 98 die "unable to parse ${hostrsapubkey_fn}\n"
2a5ed7c1 99 if $hostrsapubkey !~ m/$sshpubkeypattern/;
0854fb22
DM
100
101 my $nodename = PVE::INotify::nodename();
102
cba17aeb 103 $res->{name} = $nodename;
0854fb22 104
b0d26b8f 105 $res->{ip} = PVE::Network::get_ip_from_hostname($nodename);
0854fb22 106
cba17aeb 107 $res->{hostrsapubkey} = $hostrsapubkey;
0854fb22 108
cba17aeb
DM
109 if (! -f $rootrsapubkey_fn) {
110 unlink $rootrsakey_fn;
111 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
112 '-f', $rootrsakey_fn];
1fb2ab76 113 PMG::Utils::run_silent_cmd($cmd);
cba17aeb
DM
114 }
115
116 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
117 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
118 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
119
120 die "unable to parse ${rootrsapubkey_fn}\n"
2a5ed7c1 121 if $rootrsapubkey !~ m/$sshpubkeypattern/;
cba17aeb
DM
122
123 $res->{rootrsapubkey} = $rootrsapubkey;
124
125 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
126
127 return $res;
128}
129
130# X509 Certificate cache helper
131
132my $cert_cache_nodes = {};
133my $cert_cache_timestamp = time();
134my $cert_cache_fingerprints = {};
0854fb22 135
cba17aeb 136sub update_cert_cache {
0854fb22 137
cba17aeb
DM
138 $cert_cache_timestamp = time();
139
140 $cert_cache_fingerprints = {};
141 $cert_cache_nodes = {};
142
d8782874 143 my $cinfo = PMG::ClusterConfig->new();
cba17aeb
DM
144
145 foreach my $entry (values %{$cinfo->{ids}}) {
146 my $node = $entry->{name};
147 my $fp = $entry->{fingerprint};
148 if ($node && $fp) {
149 $cert_cache_fingerprints->{$fp} = 1;
150 $cert_cache_nodes->{$node} = $fp;
0854fb22
DM
151 }
152 }
153}
154
155# load and cache cert fingerprint once
156sub initialize_cert_cache {
157 my ($node) = @_;
158
cba17aeb 159 update_cert_cache()
0854fb22
DM
160 if defined($node) && !defined($cert_cache_nodes->{$node});
161}
162
163sub check_cert_fingerprint {
164 my ($cert) = @_;
165
166 # clear cache every 30 minutes at least
cba17aeb 167 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
0854fb22
DM
168
169 # get fingerprint of server certificate
170 my $fp;
171 eval {
172 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
173 };
174 return 0 if $@ || !defined($fp) || $fp eq ''; # error
175
176 my $check = sub {
177 for my $expected (keys %$cert_cache_fingerprints) {
178 return 1 if $fp eq $expected;
179 }
180 return 0;
181 };
182
cba17aeb 183 return 1 if $check->();
0854fb22
DM
184
185 # clear cache and retry at most once every minute
186 if (time() - $cert_cache_timestamp >= 60) {
187 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
188 update_cert_cache();
cba17aeb 189 return $check->();
0854fb22
DM
190 }
191
192 return 0;
193}
194
58072364
DM
195my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
196my $rootsshauthkeys = "/root/.ssh/authorized_keys";
197my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
198
199sub update_ssh_keys {
200 my ($cinfo) = @_;
201
809dd92a 202 my $old = '';
58072364 203 my $data = '';
809dd92a 204
58072364
DM
205 foreach my $node (values %{$cinfo->{ids}}) {
206 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
207 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
208 }
209
809dd92a
DM
210 $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
211 if -f $sshglobalknownhosts;
212
213 PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
214 if $old ne $data;
58072364
DM
215
216 $data = '';
809dd92a 217 $old = '';
58072364
DM
218
219 # always add ourself
220 if (-f $ssh_rsa_id) {
221 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
222 chomp($pub);
223 $data .= "$pub\n";
224 }
225
226 foreach my $node (values %{$cinfo->{ids}}) {
227 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
228 }
229
230 if (-f $rootsshauthkeys) {
a7c7cad7
DM
231 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
232 chomp($mykey);
233 $data .= "$mykey\n";
58072364
DM
234 }
235
236 my $newdata = "";
237 my $vhash = {};
238 my @lines = split(/\n/, $data);
239 foreach my $line (@lines) {
240 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
241 next if $vhash->{$3}++;
242 }
243 $newdata .= "$line\n";
244 }
245
809dd92a
DM
246 $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
247 if -f $rootsshauthkeys;
248
249 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
250 if $old ne $newdata;
58072364
DM
251}
252
a7c7cad7
DM
253my $cfgdir = '/etc/pmg';
254my $syncdir = "$cfgdir/master";
255
256my $cond_commit_synced_file = sub {
257 my ($filename, $dstfn) = @_;
258
259 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
260 my $srcfn = "$syncdir/$filename";
261
262 if (! -f $srcfn) {
263 unlink $dstfn;
264 return;
265 }
266
267 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
268
269 if (-f $dstfn) {
270 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
271 return 0 if $new eq $old;
272 }
273
1dc946d4
DM
274 # set mtime (touch) to avoid time drift problems
275 utime(undef, undef, $srcfn);
276
a7c7cad7
DM
277 rename($srcfn, $dstfn) ||
278 die "cond_rename_file '$filename' failed - $!\n";
279
280 print STDERR "updated $dstfn\n";
281
282 return 1;
283};
284
8b455bef
SI
285my $ssh_command = sub {
286 my ($host_key_alias, @args) = @_;
287
288 my $cmd = ['ssh', '-l', 'root', '-o', 'BatchMode=yes'];
289 push @$cmd, '-o', "HostKeyAlias=${host_key_alias}" if $host_key_alias;
290 push @$cmd, @args if @args;
291 return $cmd;
292};
293
2930bbb2
SI
294sub get_remote_cert_fingerprint {
295 my ($ni) = @_;
296
297 my $ssh_cmd = $ssh_command->(
f9ee6324
TL
298 $ni->{name},
299 $ni->{ip},
300 'openssl x509 -noout -fingerprint -sha256 -in /etc/pmg/pmg-api.pem'
301 );
2930bbb2
SI
302 my $fp;
303 eval {
304 PVE::Tools::run_command($ssh_cmd, outfunc => sub {
305 my ($line) = @_;
306 if ($line =~ m/SHA256 Fingerprint=((?:[A-Fa-f0-9]{2}:){31}[A-Fa-f0-9]{2})/) {
307 $fp = $1;
308 }
309 });
310 die "parsing failed\n" if !$fp;
311 };
312 die "unable to get remote node fingerprint from '$ni->{name}': $@\n" if $@;
313
314 return $fp;
315}
316
b61378e1
SI
317sub trigger_update_fingerprints {
318 my ($cinfo) = @_;
319
320 my $master = $cinfo->{master} || die "unable to lookup master node\n";
0b0c58e6 321 my $cached_fp = { $master->{fingerprint} => 1 };
b61378e1
SI
322
323 # if running on master the current fingerprint for the API-connection is needed
0b0c58e6 324 # in addition (to prevent races with restarting pmgproxy
b61378e1 325 if ($cinfo->{local}->{type} eq 'master') {
0b0c58e6
SI
326 my $new_fp = PMG::Cluster::read_local_ssl_cert_fingerprint();
327 $cached_fp->{$new_fp} = 1;
b61378e1
SI
328 }
329
330 my $ticket = PMG::Ticket::assemble_ticket('root@pam');
331 my $csrftoken = PMG::Ticket::assemble_csrf_prevention_token('root@pam');
332 my $conn = PVE::APIClient::LWP->new(
333 ticket => $ticket,
334 csrftoken => $csrftoken,
335 cookie_name => 'PMGAuthCookie',
336 host => $master->{ip},
0b0c58e6
SI
337 cached_fingerprints => $cached_fp,
338 );
b61378e1
SI
339
340 $conn->post("/config/cluster/update-fingerprints", {});
341 return undef;
342}
343
c9dae0df
DM
344my $rsync_command = sub {
345 my ($host_key_alias, @args) = @_;
346
8b455bef 347 my $ssh_cmd = join(' ', @{$ssh_command->($host_key_alias)});
c9dae0df 348
8b455bef 349 my $cmd = ['rsync', "--rsh=$ssh_cmd", '-q', @args];
c9dae0df
DM
350
351 return $cmd;
352};
353
354sub sync_quarantine_files {
89eefd36 355 my ($host_ip, $host_name, $flistname, $rcid) = @_;
c9dae0df 356
9430b6d4
DM
357 my $spooldir = $PMG::MailQueue::spooldir;
358
89eefd36
DM
359 mkdir "$spooldir/cluster/";
360 my $syncdir = "$spooldir/cluster/$rcid";
361 mkdir $syncdir;
362
c9dae0df 363 my $cmd = $rsync_command->(
4daf3a35 364 $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
c9dae0df
DM
365 '--files-from', $flistname);
366
9430b6d4 367 PVE::Tools::run_command($cmd);
c9dae0df
DM
368}
369
370sub sync_spooldir {
371 my ($host_ip, $host_name, $rcid) = @_;
372
9430b6d4
DM
373 my $spooldir = $PMG::MailQueue::spooldir;
374
c9dae0df
DM
375 mkdir "$spooldir/cluster/";
376 my $syncdir = "$spooldir/cluster/$rcid";
377 mkdir $syncdir;
378
379 my $cmd = $rsync_command->(
4daf3a35 380 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
c9dae0df
DM
381
382 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
383 push @$cmd, '--include', $incl;
384 }
385
386 push @$cmd, '--exclude', '*';
387
388 PVE::Tools::run_command($cmd);
389}
390
391sub sync_master_quar {
392 my ($host_ip, $host_name) = @_;
393
9430b6d4
DM
394 my $spooldir = $PMG::MailQueue::spooldir;
395
c9dae0df
DM
396 my $syncdir = "$spooldir/cluster/";
397 mkdir $syncdir;
398
399 my $cmd = $rsync_command->(
4daf3a35 400 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
c9dae0df
DM
401
402 PVE::Tools::run_command($cmd);
403}
404
58072364 405sub sync_config_from_master {
809ae8f4 406 my ($master_name, $master_ip, $noreload) = @_;
58072364 407
58072364 408 mkdir $syncdir;
a7c7cad7 409 File::Path::remove_tree($syncdir, {keep_root => 1});
58072364 410
a7c7cad7
DM
411 my $sa_conf_dir = "/etc/mail/spamassassin";
412 my $sa_custom_cf = "custom.cf";
957d0882 413 my $sa_rules_cf = "pmg-scores.cf";
58072364 414
c9dae0df 415 my $cmd = $rsync_command->(
f4b7112b 416 $master_name, '-aq',
957d0882 417 "[${master_ip}]:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf} ${sa_conf_dir}/${sa_rules_cf}",
c9dae0df 418 "$syncdir/",
f4b7112b 419 '--exclude', 'master/',
c9dae0df
DM
420 '--exclude', '*~',
421 '--exclude', '*.db',
422 '--exclude', 'pmg-api.pem',
423 '--exclude', 'pmg-tls.pem',
424 );
58072364
DM
425
426 my $errmsg = "syncing master configuration from '${master_ip}' failed";
427 PVE::Tools::run_command($cmd, errmsg => $errmsg);
a7c7cad7
DM
428
429 # verify that the remote host is cluster master
430 open (my $fh, '<', "$syncdir/cluster.conf") ||
431 die "unable to open synced cluster.conf - $!\n";
a7c7cad7 432
809ae8f4
DM
433 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
434
435 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
a7c7cad7
DM
436 die "host '$master_ip' is not cluster master\n";
437 }
438
809ae8f4
DM
439 my $role = $cinfo->{'local'}->{type} // '-';
440 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
a7c7cad7
DM
441 if $role eq '-';
442
809ae8f4 443 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
a7c7cad7
DM
444 if $role eq 'master';
445
a7c7cad7 446 $cond_commit_synced_file->('cluster.conf');
a7c7cad7 447
809dd92a
DM
448 update_ssh_keys($cinfo); # rewrite ssh keys
449
0757859a
DM
450 PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
451
a7c7cad7
DM
452 my $files = [
453 'pmg-authkey.key',
454 'pmg-authkey.pub',
455 'pmg-csrf.key',
456 'ldap.conf',
457 'user.conf',
7cac3e28
DM
458 'domains',
459 'mynetworks',
460 'transport',
959aaeba 461 'tls_policy',
fd6feef4 462 'fetchmailrc',
a7c7cad7
DM
463 ];
464
465 foreach my $filename (@$files) {
466 $cond_commit_synced_file->($filename);
467 }
468
06cebf83
SI
469 my $dirs = [
470 'templates',
471 'dkim',
2cfdd9a1 472 'pbs',
6a477857 473 'acme',
06cebf83
SI
474 ];
475
476 foreach my $dir (@$dirs) {
477 my $srcdir = "$syncdir/$dir";
478
479 if ( -d $srcdir ) {
480 my $cmd = ['rsync', '-aq', '--delete-after', "$srcdir/", "$cfgdir/$dir"];
481 PVE::Tools::run_command($cmd);
482 }
483
484 }
0757859a 485
a7c7cad7
DM
486 my $force_restart = {};
487
957d0882
DC
488 for my $file (($sa_custom_cf, $sa_rules_cf)) {
489 if ($cond_commit_synced_file->($file, "${sa_conf_dir}/${file}")) {
490 $force_restart->{'pmg-smtp-filter'} = 1;
491 }
a7c7cad7
DM
492 }
493
494 $cond_commit_synced_file->('pmg.conf');
f4b7112b 495
5314a3a9 496 return $force_restart;
58072364
DM
497}
498
db303db4
DM
499sub sync_ruledb_from_master {
500 my ($ldb, $rdb, $ni, $ticket) = @_;
501
502 my $ruledb = PMG::RuleDB->new($ldb);
503 my $rulecache = PMG::RuleCache->new($ruledb);
504
505 my $conn = PVE::APIClient::LWP->new(
506 ticket => $ticket,
507 cookie_name => 'PMGAuthCookie',
508 host => $ni->{ip},
509 cached_fingerprints => {
510 $ni->{fingerprint} => 1,
511 });
512
513 my $digest = $conn->get("/config/ruledb/digest", {});
514
515 return if $digest eq $rulecache->{digest}; # no changes
516
517 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
518
519 eval {
520 $ldb->begin_work;
521
522 $ldb->do("DELETE FROM Rule");
523 $ldb->do("DELETE FROM RuleGroup");
524 $ldb->do("DELETE FROM ObjectGroup");
525 $ldb->do("DELETE FROM Object");
526 $ldb->do("DELETE FROM Attribut");
527
528 eval {
529 $rdb->begin_work;
530
531 # read a consistent snapshot
532 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
533
534 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
535 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
536 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
537 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
538 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
539 };
540
541 $rdb->rollback; # end transaction
542
543 die $@ if $@;
544
545 # update sequences
546
547 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
548 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
549 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
550
551 $ldb->commit;
552 };
553 if (my $err = $@) {
554 $ldb->rollback;
555 die $err;
556 }
557
e29bdb0a
SI
558 PMG::DBTools::reload_ruledb();
559
db303db4
DM
560 syslog('info', "finished rule database sync from host '$ni->{ip}'");
561}
562
9430b6d4
DM
563sub sync_quarantine_db {
564 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
565
566 my $rcid = $ni->{cid};
567
568 my $maxmails = 100000;
569
570 my $mscount = 0;
571
572 my $ctime = PMG::DBTools::get_remote_time($rdb);
573
574 my $maxcount = 1000;
575
576 my $count;
577
578 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
579
580 do { # get new values
581
582 $count = 0;
583
584 my $flistname = "/tmp/quarantinefilelist.$$";
585
586 eval {
587 $ldb->begin_work;
588
589 open(my $flistfh, '>', $flistname) ||
590 die "unable to open file '$flistname' - $!\n";
591
592 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
593
594 # sync CMailStore
595
596 my $sth = $rdb->prepare(
597 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
598 "ORDER BY cid,rid LIMIT ?");
599 $sth->execute($rcid, $lastid, $maxcount);
600
601 my $maxid;
602 my $callback = sub {
603 my $ref = shift;
604 $maxid = $ref->{rid};
89eefd36
DM
605 my $filename = $ref->{file};
606 # skip files generated before cluster was created
607 return if $filename !~ m!^cluster/!;
608 print $flistfh "$filename\n";
9430b6d4
DM
609 };
610
611 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
612 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
613
614 close($flistfh);
615
616 my $starttime = [ gettimeofday() ];
89eefd36 617 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
9430b6d4
DM
618 $$rsynctime_ref += tv_interval($starttime);
619
620 if ($maxid) {
621 # sync CMSReceivers
622
623 $sth = $rdb->prepare(
624 "SELECT * from CMSReceivers WHERE " .
625 "CMailStore_CID = ? AND CMailStore_RID > ? " .
626 "AND CMailStore_RID <= ?");
627 $sth->execute($rcid, $lastid, $maxid);
628
666b5e8f 629 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
9430b6d4
DM
630 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
631
632 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
633 }
634
635 $ldb->commit;
636 };
637 my $err = $@;
638
639 unlink $flistname;
640
641 if ($err) {
642 $ldb->rollback;
643 die $err;
644 }
645
646 $mscount += $count;
647
d4c2f646 648 } while (($count >= $maxcount) && ($mscount < $maxmails));
9430b6d4
DM
649
650 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
651
652 eval { # synchronize status updates
653 $ldb->begin_work;
654
655 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
656
657 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
658 $sth->execute($lastmt);
659
660 my $update_sth = $ldb->prepare(
661 "UPDATE CMSReceivers SET status = ? WHERE " .
666b5e8f 662 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
9430b6d4
DM
663 while (my $ref = $sth->fetchrow_hashref()) {
664 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
666b5e8f 665 $ref->{cmailstore_rid}, $ref->{ticketid});
9430b6d4
DM
666 }
667
668 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
669
670 $ldb->commit;
671 };
672 if (my $err = $@) {
673 $ldb->rollback;
674 die $err;
675 }
676
677 return $mscount;
678}
679
680sub sync_statistic_db {
681 my ($ldb, $rdb, $ni) = @_;
682
683 my $rcid = $ni->{cid};
684
685 my $maxmails = 100000;
686
687 my $mscount = 0;
688
689 my $maxcount = 1000;
690
691 my $count;
692
693 PMG::DBTools::create_clusterinfo_default(
694 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
695
696 do { # get new values
697
698 $count = 0;
699
700 eval {
701 $ldb->begin_work;
702
703 my $lastid = PMG::DBTools::read_int_clusterinfo(
704 $ldb, $rcid, 'lastid_CStatistic');
705
706 # sync CStatistic
707
708 my $sth = $rdb->prepare(
709 "SELECT * from CStatistic " .
710 "WHERE cid = ? AND rid > ? " .
711 "ORDER BY cid, rid LIMIT ?");
712 $sth->execute($rcid, $lastid, $maxcount);
713
714 my $maxid;
715 my $callback = sub {
716 my $ref = shift;
717 $maxid = $ref->{rid};
718 };
719
720 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
721 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
722
723 if ($maxid) {
724 # sync CReceivers
725
726 $sth = $rdb->prepare(
727 "SELECT * from CReceivers WHERE " .
728 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
729 $sth->execute($rcid, $lastid, $maxid);
730
731 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
732 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
9430b6d4
DM
733 }
734
64b7a2ea
DM
735 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
736
9430b6d4
DM
737 $ldb->commit;
738 };
739 if (my $err = $@) {
740 $ldb->rollback;
741 die $err;
742 }
743
744 $mscount += $count;
745
d4c2f646 746 } while (($count >= $maxcount) && ($mscount < $maxmails));
e86b85a3
DM
747
748 return $mscount;
9430b6d4
DM
749}
750
986eec31 751my $sync_generic_mtime_db = sub {
9430b6d4
DM
752 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
753
9430b6d4
DM
754 my $ctime = PMG::DBTools::get_remote_time($rdb);
755
756 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
757
758 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
759
4c93256a 760 my $sql_cmd = $selectfunc->($ctime, $lastmt);
9430b6d4 761
4c93256a 762 my $sth = $rdb->prepare($sql_cmd);
9430b6d4
DM
763
764 $sth->execute();
765
986eec31
DM
766 my $updates = 0;
767
4c93256a
DM
768 eval {
769 # use transaction to speedup things
770 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
4c93256a
DM
771 my $count = 0;
772 while (my $ref = $sth->fetchrow_hashref()) {
9d5bdd2a
DM
773 $ldb->begin_work if !$count;
774 $mergefunc->($ref);
4c93256a
DM
775 if (++$count >= $max) {
776 $count = 0;
777 $ldb->commit;
9430b6d4 778 }
986eec31 779 $updates++;
9430b6d4 780 }
9d5bdd2a
DM
781
782 $ldb->commit if $count;
9430b6d4 783 };
4c93256a
DM
784 if (my $err = $@) {
785 $ldb->rollback;
786 die $err;
9430b6d4
DM
787 }
788
9430b6d4
DM
789 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
790
986eec31
DM
791 return $updates;
792};
9430b6d4 793
5e1408fd
DM
794sub sync_localstat_db {
795 my ($dbh, $rdb, $ni) = @_;
796
797 my $rcid = $ni->{cid};
798
799 my $selectfunc = sub {
800 my ($ctime, $lastmt) = @_;
801 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
802 };
803
804 my $merge_sth = $dbh->prepare(
ebd19c79
DM
805 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
806 'VALUES (?, ?, ?, ?, ?) ' .
5e1408fd 807 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
ebd19c79 808 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
5e1408fd
DM
809
810 my $mergefunc = sub {
811 my ($ref) = @_;
812
ebd19c79 813 $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
5e1408fd
DM
814 };
815
816 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
817}
818
9430b6d4
DM
819sub sync_greylist_db {
820 my ($dbh, $rdb, $ni) = @_;
821
822 my $selectfunc = sub {
823 my ($ctime, $lastmt) = @_;
824 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
825 "mtime >= $lastmt AND CID != 0";
826 };
827
f61d5489 828 my $merge_sth = $dbh->prepare(PMG::DBTools::cgreylist_merge_sql());
9430b6d4 829 my $mergefunc = sub {
986eec31 830 my ($ref) = @_;
9430b6d4 831
f61d5489
SI
832 my $ipnet = $ref->{ipnet};
833 $ipnet .= '.0/24' if $ipnet !~ /\/\d+$/;
f413f920 834 $merge_sth->execute(
4e5d7fd8 835 $ipnet, $ref->{sender}, $ref->{receiver},
f413f920
DM
836 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
837 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
9430b6d4
DM
838 };
839
986eec31 840 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
9430b6d4
DM
841}
842
843sub sync_userprefs_db {
844 my ($dbh, $rdb, $ni) = @_;
845
846 my $selectfunc = sub {
847 my ($ctime, $lastmt) = @_;
848
849 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
850 };
851
7d13157e 852 my $merge_sth = $dbh->prepare(
471b05ed 853 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
286bc590 854 'VALUES (?, ?, ?, ?) ' .
471b05ed 855 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
dbdf1298 856 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
3a9ffd85 857 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
0527ea1a 858 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
471b05ed 859
9430b6d4 860 my $mergefunc = sub {
986eec31 861 my ($ref) = @_;
9430b6d4 862
286bc590 863 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
9430b6d4
DM
864 };
865
986eec31 866 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
9430b6d4
DM
867}
868
869sub sync_domainstat_db {
870 my ($dbh, $rdb, $ni) = @_;
871
872 my $selectfunc = sub {
873 my ($ctime, $lastmt) = @_;
874 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
875 };
876
1c7ea32c
DM
877 my $merge_sth = $dbh->prepare(
878 'INSERT INTO Domainstat ' .
879 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
880 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
881 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
882 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
883 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
884 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
885 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
886 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
887 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
888 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
889
9430b6d4 890 my $mergefunc = sub {
986eec31 891 my ($ref) = @_;
9430b6d4 892
1c7ea32c
DM
893 $merge_sth->execute(
894 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
895 $ref->{bytesin}, $ref->{bytesout},
896 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
897 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
898 };
899
986eec31 900 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
9430b6d4
DM
901}
902
903sub sync_dailystat_db {
904 my ($dbh, $rdb, $ni) = @_;
905
906 my $selectfunc = sub {
907 my ($ctime, $lastmt) = @_;
908 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
8bf584ae
DM
909 };
910
911 my $merge_sth = $dbh->prepare(
912 'INSERT INTO DailyStat ' .
913 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
914 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
915 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
916 'ON CONFLICT (Time) DO UPDATE SET ' .
917 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
918 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
919 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
920 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
921 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
922 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
923 'RBLCount = excluded.RBLCount, ' .
924 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
9430b6d4
DM
925
926 my $mergefunc = sub {
986eec31 927 my ($ref) = @_;
9430b6d4 928
dbdf1298 929 $merge_sth->execute(
8bf584ae
DM
930 $ref->{time}, $ref->{countin}, $ref->{countout},
931 $ref->{bytesin}, $ref->{bytesout},
932 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
933 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
934 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
935 };
936
986eec31 937 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
9430b6d4
DM
938}
939
940sub sync_virusinfo_db {
941 my ($dbh, $rdb, $ni) = @_;
942
943 my $selectfunc = sub {
944 my ($ctime, $lastmt) = @_;
945 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
946 };
947
80615ad6
DM
948 my $merge_sth = $dbh->prepare(
949 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
950 'VALUES (?,?,?,?) ' .
951 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
952 'Count = excluded.Count , MTime = excluded.MTime');
953
9430b6d4 954 my $mergefunc = sub {
986eec31 955 my ($ref) = @_;
9430b6d4 956
80615ad6 957 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
9430b6d4
DM
958 };
959
986eec31 960 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
9430b6d4
DM
961}
962
963sub sync_deleted_nodes_from_master {
964 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
965
966 my $rsynctime = 0;
967
968 my $cid_hash = {}; # fast lookup
5c6e6eeb 969 foreach my $ni (values %{$cinfo->{ids}}) {
9430b6d4
DM
970 $cid_hash->{$ni->{cid}} = $ni;
971 }
972
973 my $spooldir = $PMG::MailQueue::spooldir;
974
5c6e6eeb
DM
975 my $maxcid = $cinfo->{master}->{maxcid} // 0;
976
977 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
9430b6d4
DM
978 next if $cid_hash->{$rcid};
979
980 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
981
982 next if -f $done_marker; # already synced
983
984 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
985
986 my $starttime = [ gettimeofday() ];
987 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
988 $$rsynctime_ref += tv_interval($starttime);
989
990 my $fake_ni = {
991 ip => $masterni->{ip},
992 name => $masterni->{name},
993 cid => $rcid,
994 };
995
996 sync_quarantine_db($ldb, $masterdb, $fake_ni);
997
998 sync_statistic_db ($ldb, $masterdb, $fake_ni);
999
1000 open(my $fh, ">>", $done_marker);
1001 }
1002}
1003
1004
0854fb22 10051;