8 use Time
::HiRes qw
(gettimeofday tv_interval
);
13 use PVE
::APIClient
::LWP
;
18 use PMG
::ClusterConfig
;
26 my ($nodename, $noerr) = @_;
28 my $cinfo = PMG
::ClusterConfig-
>new();
30 foreach my $entry (values %{$cinfo->{ids
}}) {
31 if ($entry->{name
} eq $nodename) {
32 my $ip = $entry->{ip
};
33 return $ip if !wantarray;
34 my $family = PVE
::Tools
::get_host_address_family
($ip);
35 return ($ip, $family);
39 # fallback: try to get IP by other means
40 if ($nodename eq 'localhost' || $nodename eq PVE
::INotify
::nodename
()) {
41 return PVE
::Network
::get_local_ip
();
43 return PVE
::Network
::get_ip_from_hostname
($nodename, $noerr);
50 $cinfo = PMG
::ClusterConfig-
>new() if !$cinfo;
52 return $cinfo->{master
}->{name
} if defined($cinfo->{master
});
57 sub read_local_ssl_cert_fingerprint
{
58 my $cert_path = "/etc/pmg/pmg-api.pem";
62 my $bio = Net
::SSLeay
::BIO_new_file
($cert_path, 'r');
63 $cert = Net
::SSLeay
::PEM_read_bio_X509
($bio);
64 Net
::SSLeay
::BIO_free
($bio);
67 die "unable to read certificate '$cert_path' - $err\n";
70 if (!defined($cert)) {
71 die "unable to read certificate '$cert_path' - got empty value\n";
76 $fp = Net
::SSLeay
::X509_get_fingerprint
($cert, 'sha256');
79 die "unable to get fingerprint for '$cert_path' - $err\n";
82 if (!defined($fp) || $fp eq '') {
83 die "unable to get fingerprint for '$cert_path' - got empty value\n";
89 my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
90 my $rootrsakey_fn = '/root/.ssh/id_rsa';
91 my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
93 sub read_local_cluster_info
{
97 my $hostrsapubkey = PVE
::Tools
::file_read_firstline
($hostrsapubkey_fn);
98 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
99 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
101 my $sshpubkeypattern = PMG
::ClusterConfig
::Node
::valid_ssh_pubkey_regex
();
102 die "unable to parse ${hostrsapubkey_fn}\n"
103 if $hostrsapubkey !~ m/$sshpubkeypattern/;
105 my $nodename = PVE
::INotify
::nodename
();
107 $res->{name
} = $nodename;
109 $res->{ip
} = PVE
::Network
::get_local_ip
();
111 $res->{hostrsapubkey
} = $hostrsapubkey;
113 if (! -f
$rootrsapubkey_fn) {
114 unlink $rootrsakey_fn;
115 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
116 '-f', $rootrsakey_fn];
117 PMG
::Utils
::run_silent_cmd
($cmd);
120 my $rootrsapubkey = PVE
::Tools
::file_read_firstline
($rootrsapubkey_fn);
121 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
122 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
124 die "unable to parse ${rootrsapubkey_fn}\n"
125 if $rootrsapubkey !~ m/$sshpubkeypattern/;
127 $res->{rootrsapubkey
} = $rootrsapubkey;
129 $res->{fingerprint
} = read_local_ssl_cert_fingerprint
();
134 # X509 Certificate cache helper
136 my $cert_cache_nodes = {};
137 my $cert_cache_timestamp = time();
138 my $cert_cache_fingerprints = {};
140 sub update_cert_cache
{
142 $cert_cache_timestamp = time();
144 $cert_cache_fingerprints = {};
145 $cert_cache_nodes = {};
147 my $cinfo = PMG
::ClusterConfig-
>new();
149 foreach my $entry (values %{$cinfo->{ids
}}) {
150 my $node = $entry->{name
};
151 my $fp = $entry->{fingerprint
};
153 $cert_cache_fingerprints->{$fp} = 1;
154 $cert_cache_nodes->{$node} = $fp;
159 # load and cache cert fingerprint once
160 sub initialize_cert_cache
{
164 if defined($node) && !defined($cert_cache_nodes->{$node});
167 sub check_cert_fingerprint
{
170 # clear cache every 30 minutes at least
171 update_cert_cache
() if time() - $cert_cache_timestamp >= 60*30;
173 # get fingerprint of server certificate
176 $fp = Net
::SSLeay
::X509_get_fingerprint
($cert, 'sha256');
178 return 0 if $@ || !defined($fp) || $fp eq ''; # error
181 for my $expected (keys %$cert_cache_fingerprints) {
182 return 1 if $fp eq $expected;
187 return 1 if $check->();
189 # clear cache and retry at most once every minute
190 if (time() - $cert_cache_timestamp >= 60) {
191 syslog
('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
199 my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
200 my $rootsshauthkeys = "/root/.ssh/authorized_keys";
201 my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
203 sub update_ssh_keys
{
209 foreach my $node (values %{$cinfo->{ids
}}) {
210 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
211 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
214 $old = PVE
::Tools
::file_get_contents
($sshglobalknownhosts, 1024*1024)
215 if -f
$sshglobalknownhosts;
217 PVE
::Tools
::file_set_contents
($sshglobalknownhosts, $data)
224 if (-f
$ssh_rsa_id) {
225 my $pub = PVE
::Tools
::file_get_contents
($ssh_rsa_id);
230 foreach my $node (values %{$cinfo->{ids
}}) {
231 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
234 if (-f
$rootsshauthkeys) {
235 my $mykey = PVE
::Tools
::file_get_contents
($rootsshauthkeys, 128*1024);
242 my @lines = split(/\n/, $data);
243 foreach my $line (@lines) {
244 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
245 next if $vhash->{$3}++;
247 $newdata .= "$line\n";
250 $old = PVE
::Tools
::file_get_contents
($rootsshauthkeys, 1024*1024)
251 if -f
$rootsshauthkeys;
253 PVE
::Tools
::file_set_contents
($rootsshauthkeys, $newdata, 0600)
257 my $cfgdir = '/etc/pmg';
258 my $syncdir = "$cfgdir/master";
260 my $cond_commit_synced_file = sub {
261 my ($filename, $dstfn) = @_;
263 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
264 my $srcfn = "$syncdir/$filename";
271 my $new = PVE
::Tools
::file_get_contents
($srcfn, 1024*1024);
274 my $old = PVE
::Tools
::file_get_contents
($dstfn, 1024*1024);
275 return 0 if $new eq $old;
278 # set mtime (touch) to avoid time drift problems
279 utime(undef, undef, $srcfn);
281 rename($srcfn, $dstfn) ||
282 die "cond_rename_file '$filename' failed - $!\n";
284 print STDERR
"updated $dstfn\n";
289 my $ssh_command = sub {
290 my ($host_key_alias, @args) = @_;
292 my $cmd = ['ssh', '-l', 'root', '-o', 'BatchMode=yes'];
293 push @$cmd, '-o', "HostKeyAlias=${host_key_alias}" if $host_key_alias;
294 push @$cmd, @args if @args;
298 sub get_remote_cert_fingerprint
{
301 my $ssh_cmd = $ssh_command->(
304 'openssl x509 -noout -fingerprint -sha256 -in /etc/pmg/pmg-api.pem'
308 PVE
::Tools
::run_command
($ssh_cmd, outfunc
=> sub {
310 if ($line =~ m/SHA256 Fingerprint=((?:[a-f0-9]{2}:){31}[a-f0-9]{2})/i) {
314 die "parsing failed\n" if !$fp;
316 die "unable to get remote node fingerprint from '$ni->{name}': $@\n" if $@;
321 sub trigger_update_fingerprints
{
324 my $master = $cinfo->{master
} || die "unable to lookup master node\n";
325 my $cached_fp = { $master->{fingerprint
} => 1 };
327 # if running on master the current fingerprint for the API-connection is needed
328 # in addition (to prevent races with restarting pmgproxy
329 if ($cinfo->{local}->{type
} eq 'master') {
330 my $new_fp = PMG
::Cluster
::read_local_ssl_cert_fingerprint
();
331 $cached_fp->{$new_fp} = 1;
334 my $ticket = PMG
::Ticket
::assemble_ticket
('root@pam');
335 my $csrftoken = PMG
::Ticket
::assemble_csrf_prevention_token
('root@pam');
336 my $conn = PVE
::APIClient
::LWP-
>new(
338 csrftoken
=> $csrftoken,
339 cookie_name
=> 'PMGAuthCookie',
340 host
=> $master->{ip
},
341 cached_fingerprints
=> $cached_fp,
344 $conn->post("/config/cluster/update-fingerprints", {});
348 my $rsync_command = sub {
349 my ($host_key_alias, @args) = @_;
351 my $ssh_cmd = join(' ', @{$ssh_command->($host_key_alias)});
353 my $cmd = ['rsync', "--rsh=$ssh_cmd", '-q', @args];
358 sub sync_quarantine_files
{
359 my ($host_ip, $host_name, $flistname, $rcid) = @_;
361 my $spooldir = $PMG::MailQueue
::spooldir
;
363 mkdir "$spooldir/cluster/";
364 my $syncdir = "$spooldir/cluster/$rcid";
367 my $cmd = $rsync_command->(
368 $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
369 '--files-from', $flistname);
371 PVE
::Tools
::run_command
($cmd);
375 my ($host_ip, $host_name, $rcid) = @_;
377 my $spooldir = $PMG::MailQueue
::spooldir
;
379 mkdir "$spooldir/cluster/";
380 my $syncdir = "$spooldir/cluster/$rcid";
383 my $cmd = $rsync_command->(
384 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
386 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
387 push @$cmd, '--include', $incl;
390 push @$cmd, '--exclude', '*';
392 PVE
::Tools
::run_command
($cmd);
395 sub sync_master_quar
{
396 my ($host_ip, $host_name) = @_;
398 my $spooldir = $PMG::MailQueue
::spooldir
;
400 my $syncdir = "$spooldir/cluster/";
403 my $cmd = $rsync_command->(
404 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
406 PVE
::Tools
::run_command
($cmd);
409 sub sync_config_from_master
{
410 my ($master_name, $master_ip, $noreload) = @_;
413 File
::Path
::remove_tree
($syncdir, {keep_root
=> 1});
415 my $sa_conf_dir = "/etc/mail/spamassassin";
416 my $sa_custom_cf = "custom.cf";
417 my $sa_rules_cf = "pmg-scores.cf";
419 my $cmd = $rsync_command->(
421 "[${master_ip}]:$cfgdir/*",
422 "[${master_ip}]:${sa_conf_dir}/${sa_custom_cf}",
423 "[${master_ip}]:${sa_conf_dir}/${sa_rules_cf}",
425 '--exclude', 'master/',
428 '--exclude', 'pmg-api.pem',
429 '--exclude', 'pmg-tls.pem',
432 my $errmsg = "syncing master configuration from '${master_ip}' failed";
433 PVE
::Tools
::run_command
($cmd, errmsg
=> $errmsg);
435 # verify that the remote host is cluster master
436 open (my $fh, '<', "$syncdir/cluster.conf") ||
437 die "unable to open synced cluster.conf - $!\n";
439 my $cinfo = PMG
::ClusterConfig
::read_cluster_conf
('cluster.conf', $fh);
441 if (!$cinfo->{master
} || ($cinfo->{master
}->{ip
} ne $master_ip)) {
442 die "host '$master_ip' is not cluster master\n";
445 my $role = $cinfo->{'local'}->{type
} // '-';
446 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
449 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
450 if $role eq 'master';
452 $cond_commit_synced_file->('cluster.conf');
454 update_ssh_keys
($cinfo); # rewrite ssh keys
456 PMG
::Fetchmail
::update_fetchmail_default
(0); # disable on slave
469 'tls_inbound_domains',
473 foreach my $filename (@$files) {
474 $cond_commit_synced_file->($filename);
484 foreach my $dir (@$dirs) {
485 my $srcdir = "$syncdir/$dir";
488 my $cmd = ['rsync', '-aq', '--delete-after', "$srcdir/", "$cfgdir/$dir"];
489 PVE
::Tools
::run_command
($cmd);
494 my $force_restart = {};
496 for my $file (($sa_custom_cf, $sa_rules_cf)) {
497 if ($cond_commit_synced_file->($file, "${sa_conf_dir}/${file}")) {
498 $force_restart->{'pmg-smtp-filter'} = 1;
502 $cond_commit_synced_file->('pmg.conf');
504 return $force_restart;
507 sub sync_ruledb_from_master
{
508 my ($ldb, $rdb, $ni, $ticket) = @_;
510 my $ruledb = PMG
::RuleDB-
>new($ldb);
511 my $rulecache = PMG
::RuleCache-
>new($ruledb);
513 my $conn = PVE
::APIClient
::LWP-
>new(
515 cookie_name
=> 'PMGAuthCookie',
517 cached_fingerprints
=> {
518 $ni->{fingerprint
} => 1,
521 my $digest = $conn->get("/config/ruledb/digest", {});
523 return if $digest eq $rulecache->{digest
}; # no changes
525 syslog
('info', "detected rule database changes - starting sync from '$ni->{ip}'");
530 $ldb->do("DELETE FROM Rule");
531 $ldb->do("DELETE FROM RuleGroup");
532 $ldb->do("DELETE FROM ObjectGroup");
533 $ldb->do("DELETE FROM Object");
534 $ldb->do("DELETE FROM Attribut");
539 # read a consistent snapshot
540 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
542 PMG
::DBTools
::copy_table
($ldb, $rdb, "Rule");
543 PMG
::DBTools
::copy_table
($ldb, $rdb, "RuleGroup");
544 PMG
::DBTools
::copy_table
($ldb, $rdb, "ObjectGroup");
545 PMG
::DBTools
::copy_table
($ldb, $rdb, "Object", 'value');
546 PMG
::DBTools
::copy_table
($ldb, $rdb, "Attribut", 'value');
549 $rdb->rollback; # end transaction
555 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
556 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
557 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
566 PMG
::DBTools
::reload_ruledb
();
568 syslog
('info', "finished rule database sync from host '$ni->{ip}'");
571 sub sync_quarantine_db
{
572 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
574 my $rcid = $ni->{cid
};
576 my $maxmails = 100000;
580 my $ctime = PMG
::DBTools
::get_remote_time
($rdb);
586 PMG
::DBTools
::create_clusterinfo_default
($ldb, $rcid, 'lastid_CMailStore', -1, undef);
588 do { # get new values
592 my $flistname = "/tmp/quarantinefilelist.$$";
597 open(my $flistfh, '>', $flistname) ||
598 die "unable to open file '$flistname' - $!\n";
600 my $lastid = PMG
::DBTools
::read_int_clusterinfo
($ldb, $rcid, 'lastid_CMailStore');
604 my $sth = $rdb->prepare(
605 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
606 "ORDER BY cid,rid LIMIT ?");
607 $sth->execute($rcid, $lastid, $maxcount);
612 $maxid = $ref->{rid
};
613 my $filename = $ref->{file
};
614 # skip files generated before cluster was created
615 return if $filename !~ m!^cluster/!;
616 print $flistfh "$filename\n";
619 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
620 $count += PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CMailStore', $attrs, $callback);
624 my $starttime = [ gettimeofday
() ];
625 sync_quarantine_files
($ni->{ip
}, $ni->{name
}, $flistname, $rcid);
626 $$rsynctime_ref += tv_interval
($starttime);
631 $sth = $rdb->prepare(
632 "SELECT * from CMSReceivers WHERE " .
633 "CMailStore_CID = ? AND CMailStore_RID > ? " .
634 "AND CMailStore_RID <= ?");
635 $sth->execute($rcid, $lastid, $maxid);
637 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
638 PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CMSReceivers', $attrs);
640 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastid_CMailStore', $maxid);
656 } while (($count >= $maxcount) && ($mscount < $maxmails));
658 PMG
::DBTools
::create_clusterinfo_default
($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
660 eval { # synchronize status updates
663 my $lastmt = PMG
::DBTools
::read_int_clusterinfo
($ldb, $rcid, 'lastmt_CMSReceivers');
665 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
666 $sth->execute($lastmt);
668 my $update_sth = $ldb->prepare(
669 "UPDATE CMSReceivers SET status = ? WHERE " .
670 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
671 while (my $ref = $sth->fetchrow_hashref()) {
672 $update_sth->execute($ref->{status
}, $ref->{cmailstore_cid
},
673 $ref->{cmailstore_rid
}, $ref->{ticketid
});
676 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
688 sub sync_statistic_db
{
689 my ($ldb, $rdb, $ni) = @_;
691 my $rcid = $ni->{cid
};
693 my $maxmails = 100000;
701 PMG
::DBTools
::create_clusterinfo_default
(
702 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
704 do { # get new values
711 my $lastid = PMG
::DBTools
::read_int_clusterinfo
(
712 $ldb, $rcid, 'lastid_CStatistic');
716 my $sth = $rdb->prepare(
717 "SELECT * from CStatistic " .
718 "WHERE cid = ? AND rid > ? " .
719 "ORDER BY cid, rid LIMIT ?");
720 $sth->execute($rcid, $lastid, $maxcount);
725 $maxid = $ref->{rid
};
728 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
729 $count += PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CStatistic', $attrs, $callback);
734 $sth = $rdb->prepare(
735 "SELECT * from CReceivers WHERE " .
736 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
737 $sth->execute($rcid, $lastid, $maxid);
739 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
740 PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CReceivers', $attrs);
743 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastid_CStatistic', $maxid);
754 } while (($count >= $maxcount) && ($mscount < $maxmails));
759 my $sync_generic_mtime_db = sub {
760 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
762 my $ctime = PMG
::DBTools
::get_remote_time
($rdb);
764 PMG
::DBTools
::create_clusterinfo_default
($ldb, $ni->{cid
}, "lastmt_$table", 0, undef);
766 my $lastmt = PMG
::DBTools
::read_int_clusterinfo
($ldb, $ni->{cid
}, "lastmt_$table");
768 my $sql_cmd = $selectfunc->($ctime, $lastmt);
770 my $sth = $rdb->prepare($sql_cmd);
777 # use transaction to speedup things
778 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
780 while (my $ref = $sth->fetchrow_hashref()) {
781 $ldb->begin_work if !$count;
783 if (++$count >= $max) {
790 $ldb->commit if $count;
797 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $ni->{cid
}, "lastmt_$table", $ctime);
802 sub sync_localstat_db
{
803 my ($dbh, $rdb, $ni) = @_;
805 my $rcid = $ni->{cid
};
807 my $selectfunc = sub {
808 my ($ctime, $lastmt) = @_;
809 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
812 my $merge_sth = $dbh->prepare(
813 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
814 'VALUES (?, ?, ?, ?, ?) ' .
815 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
816 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
818 my $mergefunc = sub {
821 $merge_sth->execute($ref->{time}, $ref->{rblcount
}, $ref->{pregreetcount
}, $ref->{cid
}, $ref->{mtime
});
824 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
827 sub sync_greylist_db
{
828 my ($dbh, $rdb, $ni) = @_;
830 my $selectfunc = sub {
831 my ($ctime, $lastmt) = @_;
832 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
833 "mtime >= $lastmt AND CID != 0";
836 my $merge_sth = $dbh->prepare(PMG
::DBTools
::cgreylist_merge_sql
());
837 my $mergefunc = sub {
840 my $ipnet = $ref->{ipnet
};
841 $ipnet .= '.0/24' if $ipnet !~ /\
/\d+$/;
843 $ipnet, $ref->{sender
}, $ref->{receiver
},
844 $ref->{instance
}, $ref->{rctime
}, $ref->{extime
}, $ref->{delay
},
845 $ref->{blocked
}, $ref->{passed
}, 0, $ref->{cid
});
848 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
851 sub sync_userprefs_db
{
852 my ($dbh, $rdb, $ni) = @_;
854 my $selectfunc = sub {
855 my ($ctime, $lastmt) = @_;
857 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
860 my $merge_sth = $dbh->prepare(
861 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
862 'VALUES (?, ?, ?, ?) ' .
863 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
864 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
865 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
866 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
868 my $mergefunc = sub {
871 $merge_sth->execute($ref->{pmail
}, $ref->{name
}, $ref->{data
}, $ref->{mtime
});
874 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
877 sub sync_domainstat_db
{
878 my ($dbh, $rdb, $ni) = @_;
880 my $selectfunc = sub {
881 my ($ctime, $lastmt) = @_;
882 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
885 my $merge_sth = $dbh->prepare(
886 'INSERT INTO Domainstat ' .
887 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
888 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
889 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
890 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
891 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
892 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
893 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
894 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
895 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
896 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
898 my $mergefunc = sub {
902 $ref->{time}, $ref->{domain
}, $ref->{countin
}, $ref->{countout
},
903 $ref->{bytesin
}, $ref->{bytesout
},
904 $ref->{virusin
}, $ref->{virusout
}, $ref->{spamin
}, $ref->{spamout
},
905 $ref->{bouncesin
}, $ref->{bouncesout
}, $ref->{ptimesum
}, $ref->{mtime
});
908 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
911 sub sync_dailystat_db
{
912 my ($dbh, $rdb, $ni) = @_;
914 my $selectfunc = sub {
915 my ($ctime, $lastmt) = @_;
916 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
919 my $merge_sth = $dbh->prepare(
920 'INSERT INTO DailyStat ' .
921 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
922 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
923 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
924 'ON CONFLICT (Time) DO UPDATE SET ' .
925 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
926 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
927 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
928 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
929 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
930 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
931 'RBLCount = excluded.RBLCount, ' .
932 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
934 my $mergefunc = sub {
938 $ref->{time}, $ref->{countin
}, $ref->{countout
},
939 $ref->{bytesin
}, $ref->{bytesout
},
940 $ref->{virusin
}, $ref->{virusout
}, $ref->{spamin
}, $ref->{spamout
},
941 $ref->{bouncesin
}, $ref->{bouncesout
}, $ref->{greylistcount
},
942 $ref->{spfcount
}, $ref->{rblcount
}, $ref->{ptimesum
}, $ref->{mtime
});
945 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
948 sub sync_virusinfo_db
{
949 my ($dbh, $rdb, $ni) = @_;
951 my $selectfunc = sub {
952 my ($ctime, $lastmt) = @_;
953 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
956 my $merge_sth = $dbh->prepare(
957 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
958 'VALUES (?,?,?,?) ' .
959 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
960 'Count = excluded.Count , MTime = excluded.MTime');
962 my $mergefunc = sub {
965 $merge_sth->execute($ref->{time}, $ref->{name
}, $ref->{count
}, $ref->{mtime
});
968 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
971 sub sync_deleted_nodes_from_master
{
972 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
976 my $cid_hash = {}; # fast lookup
977 foreach my $ni (values %{$cinfo->{ids
}}) {
978 $cid_hash->{$ni->{cid
}} = $ni;
981 my $spooldir = $PMG::MailQueue
::spooldir
;
983 my $maxcid = $cinfo->{master
}->{maxcid
} // 0;
985 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
986 next if $cid_hash->{$rcid};
988 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
990 next if -f
$done_marker; # already synced
992 syslog
('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
994 my $starttime = [ gettimeofday
() ];
995 sync_spooldir
($masterni->{ip
}, $masterni->{name
}, $rcid);
996 $$rsynctime_ref += tv_interval
($starttime);
999 ip
=> $masterni->{ip
},
1000 name
=> $masterni->{name
},
1004 sync_quarantine_db
($ldb, $masterdb, $fake_ni);
1006 sync_statistic_db
($ldb, $masterdb, $fake_ni);
1008 open(my $fh, ">>", $done_marker);