8 use Time
::HiRes qw
(gettimeofday tv_interval
);
16 use PMG
::ClusterConfig
;
20 use PVE
::APIClient
::LWP
;
23 my ($nodename, $noerr) = @_;
25 my $cinfo = PMG
::ClusterConfig-
>new();
27 foreach my $entry (values %{$cinfo->{ids
}}) {
28 if ($entry->{name
} eq $nodename) {
29 my $ip = $entry->{ip
};
30 return $ip if !wantarray;
31 my $family = PVE
::Tools
::get_host_address_family
($ip);
32 return ($ip, $family);
36 # fallback: try to get IP by other means
37 return PMG
::Utils
::lookup_node_ip
($nodename, $noerr);
43 $cinfo = PMG
::ClusterConfig-
>new() if !$cinfo;
45 return $cinfo->{master
}->{name
} if defined($cinfo->{master
});
50 sub read_local_ssl_cert_fingerprint
{
51 my $cert_path = "/etc/pmg/pmg-api.pem";
55 my $bio = Net
::SSLeay
::BIO_new_file
($cert_path, 'r');
56 $cert = Net
::SSLeay
::PEM_read_bio_X509
($bio);
57 Net
::SSLeay
::BIO_free
($bio);
60 die "unable to read certificate '$cert_path' - $err\n";
63 if (!defined($cert)) {
64 die "unable to read certificate '$cert_path' - got empty value\n";
69 $fp = Net
::SSLeay
::X509_get_fingerprint
($cert, 'sha256');
72 die "unable to get fingerprint for '$cert_path' - $err\n";
75 if (!defined($fp) || $fp eq '') {
76 die "unable to get fingerprint for '$cert_path' - got empty value\n";
82 my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
83 my $rootrsakey_fn = '/root/.ssh/id_rsa';
84 my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
86 sub read_local_cluster_info
{
90 my $hostrsapubkey = PVE
::Tools
::file_read_firstline
($hostrsapubkey_fn);
91 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
92 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
94 die "unable to parse ${hostrsapubkey_fn}\n"
95 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\
+]{200,}$/;
97 my $nodename = PVE
::INotify
::nodename
();
99 $res->{name
} = $nodename;
101 $res->{ip
} = PMG
::Utils
::lookup_node_ip
($nodename);
103 $res->{hostrsapubkey
} = $hostrsapubkey;
105 if (! -f
$rootrsapubkey_fn) {
106 unlink $rootrsakey_fn;
107 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
108 '-f', $rootrsakey_fn];
109 PMG
::Utils
::run_silent_cmd
($cmd);
112 my $rootrsapubkey = PVE
::Tools
::file_read_firstline
($rootrsapubkey_fn);
113 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
114 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
116 die "unable to parse ${rootrsapubkey_fn}\n"
117 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\
+]{200,}$/;
119 $res->{rootrsapubkey
} = $rootrsapubkey;
121 $res->{fingerprint
} = read_local_ssl_cert_fingerprint
();
126 # X509 Certificate cache helper
128 my $cert_cache_nodes = {};
129 my $cert_cache_timestamp = time();
130 my $cert_cache_fingerprints = {};
132 sub update_cert_cache
{
134 $cert_cache_timestamp = time();
136 $cert_cache_fingerprints = {};
137 $cert_cache_nodes = {};
139 my $cinfo = PMG
::ClusterConfig-
>new();
141 foreach my $entry (values %{$cinfo->{ids
}}) {
142 my $node = $entry->{name
};
143 my $fp = $entry->{fingerprint
};
145 $cert_cache_fingerprints->{$fp} = 1;
146 $cert_cache_nodes->{$node} = $fp;
151 # load and cache cert fingerprint once
152 sub initialize_cert_cache
{
156 if defined($node) && !defined($cert_cache_nodes->{$node});
159 sub check_cert_fingerprint
{
162 # clear cache every 30 minutes at least
163 update_cert_cache
() if time() - $cert_cache_timestamp >= 60*30;
165 # get fingerprint of server certificate
168 $fp = Net
::SSLeay
::X509_get_fingerprint
($cert, 'sha256');
170 return 0 if $@ || !defined($fp) || $fp eq ''; # error
173 for my $expected (keys %$cert_cache_fingerprints) {
174 return 1 if $fp eq $expected;
179 return 1 if $check->();
181 # clear cache and retry at most once every minute
182 if (time() - $cert_cache_timestamp >= 60) {
183 syslog
('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
191 my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
192 my $rootsshauthkeys = "/root/.ssh/authorized_keys";
193 my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
195 sub update_ssh_keys
{
199 foreach my $node (values %{$cinfo->{ids
}}) {
200 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
201 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
204 PVE
::Tools
::file_set_contents
($sshglobalknownhosts, $data);
209 if (-f
$ssh_rsa_id) {
210 my $pub = PVE
::Tools
::file_get_contents
($ssh_rsa_id);
215 foreach my $node (values %{$cinfo->{ids
}}) {
216 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
219 if (-f
$rootsshauthkeys) {
220 my $mykey = PVE
::Tools
::file_get_contents
($rootsshauthkeys, 128*1024);
227 my @lines = split(/\n/, $data);
228 foreach my $line (@lines) {
229 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
230 next if $vhash->{$3}++;
232 $newdata .= "$line\n";
235 PVE
::Tools
::file_set_contents
($rootsshauthkeys, $newdata, 0600);
238 my $cfgdir = '/etc/pmg';
239 my $syncdir = "$cfgdir/master";
241 my $cond_commit_synced_file = sub {
242 my ($filename, $dstfn) = @_;
244 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
245 my $srcfn = "$syncdir/$filename";
252 my $new = PVE
::Tools
::file_get_contents
($srcfn, 1024*1024);
255 my $old = PVE
::Tools
::file_get_contents
($dstfn, 1024*1024);
256 return 0 if $new eq $old;
259 rename($srcfn, $dstfn) ||
260 die "cond_rename_file '$filename' failed - $!\n";
262 print STDERR
"updated $dstfn\n";
267 my $rsync_command = sub {
268 my ($host_key_alias, @args) = @_;
270 my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
271 $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
273 my $cmd = ['rsync', $ssh_cmd, '-q', @args];
278 sub sync_quarantine_files
{
279 my ($host_ip, $host_name, $flistname) = @_;
281 my $spooldir = $PMG::MailQueue
::spooldir
;
283 my $cmd = $rsync_command->(
284 $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir,
285 '--files-from', $flistname);
287 PVE
::Tools
::run_command
($cmd);
291 my ($host_ip, $host_name, $rcid) = @_;
293 my $spooldir = $PMG::MailQueue
::spooldir
;
295 mkdir "$spooldir/cluster/";
296 my $syncdir = "$spooldir/cluster/$rcid";
299 my $cmd = $rsync_command->(
300 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir);
302 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
303 push @$cmd, '--include', $incl;
306 push @$cmd, '--exclude', '*';
308 PVE
::Tools
::run_command
($cmd);
311 sub sync_master_quar
{
312 my ($host_ip, $host_name) = @_;
314 my $spooldir = $PMG::MailQueue
::spooldir
;
316 my $syncdir = "$spooldir/cluster/";
319 my $cmd = $rsync_command->(
320 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir);
322 PVE
::Tools
::run_command
($cmd);
325 sub sync_config_from_master
{
326 my ($master_name, $master_ip, $noreload) = @_;
329 File
::Path
::remove_tree
($syncdir, {keep_root
=> 1});
331 my $sa_conf_dir = "/etc/mail/spamassassin";
332 my $sa_custom_cf = "custom.cf";
334 my $cmd = $rsync_command->(
335 $master_name, '-lpgoq',
336 "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
340 '--exclude', 'pmg-api.pem',
341 '--exclude', 'pmg-tls.pem',
344 my $errmsg = "syncing master configuration from '${master_ip}' failed";
345 PVE
::Tools
::run_command
($cmd, errmsg
=> $errmsg);
347 # verify that the remote host is cluster master
348 open (my $fh, '<', "$syncdir/cluster.conf") ||
349 die "unable to open synced cluster.conf - $!\n";
351 my $cinfo = PMG
::ClusterConfig
::read_cluster_conf
('cluster.conf', $fh);
353 if (!$cinfo->{master
} || ($cinfo->{master
}->{ip
} ne $master_ip)) {
354 die "host '$master_ip' is not cluster master\n";
357 my $role = $cinfo->{'local'}->{type
} // '-';
358 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
361 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
362 if $role eq 'master';
364 $cond_commit_synced_file->('cluster.conf');
374 foreach my $filename (@$files) {
375 $cond_commit_synced_file->($filename);
378 my $force_restart = {};
380 if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
381 $force_restart->{spam
} = 1;
384 $cond_commit_synced_file->('pmg.conf');
386 my $cfg = PMG
::Config-
>new();
388 $cfg->rewrite_config(1, $force_restart);
391 sub sync_ruledb_from_master
{
392 my ($ldb, $rdb, $ni, $ticket) = @_;
394 my $ruledb = PMG
::RuleDB-
>new($ldb);
395 my $rulecache = PMG
::RuleCache-
>new($ruledb);
397 my $conn = PVE
::APIClient
::LWP-
>new(
399 cookie_name
=> 'PMGAuthCookie',
401 cached_fingerprints
=> {
402 $ni->{fingerprint
} => 1,
405 my $digest = $conn->get("/config/ruledb/digest", {});
407 return if $digest eq $rulecache->{digest
}; # no changes
409 syslog
('info', "detected rule database changes - starting sync from '$ni->{ip}'");
414 $ldb->do("DELETE FROM Rule");
415 $ldb->do("DELETE FROM RuleGroup");
416 $ldb->do("DELETE FROM ObjectGroup");
417 $ldb->do("DELETE FROM Object");
418 $ldb->do("DELETE FROM Attribut");
423 # read a consistent snapshot
424 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
426 PMG
::DBTools
::copy_table
($ldb, $rdb, "Rule");
427 PMG
::DBTools
::copy_table
($ldb, $rdb, "RuleGroup");
428 PMG
::DBTools
::copy_table
($ldb, $rdb, "ObjectGroup");
429 PMG
::DBTools
::copy_table
($ldb, $rdb, "Object", 'value');
430 PMG
::DBTools
::copy_table
($ldb, $rdb, "Attribut", 'value');
433 $rdb->rollback; # end transaction
439 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
440 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
441 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
450 syslog
('info', "finished rule database sync from host '$ni->{ip}'");
453 sub sync_quarantine_db
{
454 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
456 my $rcid = $ni->{cid
};
458 my $maxmails = 100000;
462 my $ctime = PMG
::DBTools
::get_remote_time
($rdb);
468 PMG
::DBTools
::create_clusterinfo_default
($ldb, $rcid, 'lastid_CMailStore', -1, undef);
470 do { # get new values
474 my $flistname = "/tmp/quarantinefilelist.$$";
479 open(my $flistfh, '>', $flistname) ||
480 die "unable to open file '$flistname' - $!\n";
482 my $lastid = PMG
::DBTools
::read_int_clusterinfo
($ldb, $rcid, 'lastid_CMailStore');
486 my $sth = $rdb->prepare(
487 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
488 "ORDER BY cid,rid LIMIT ?");
489 $sth->execute($rcid, $lastid, $maxcount);
494 $maxid = $ref->{rid
};
495 print $flistfh "$ref->{file}\n";
498 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
499 $count += PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CMailStore', $attrs, $callback);
503 my $starttime = [ gettimeofday
() ];
504 sync_quarantine_files
($ni->{ip
}, $ni->{name
}, $flistname);
505 $$rsynctime_ref += tv_interval
($starttime);
510 $sth = $rdb->prepare(
511 "SELECT * from CMSReceivers WHERE " .
512 "CMailStore_CID = ? AND CMailStore_RID > ? " .
513 "AND CMailStore_RID <= ?");
514 $sth->execute($rcid, $lastid, $maxid);
516 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
517 PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CMSReceivers', $attrs);
519 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastid_CMailStore', $maxid);
535 last if $mscount >= $maxmails;
537 } while ($count >= $maxcount);
539 PMG
::DBTools
::create_clusterinfo_default
($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
541 eval { # synchronize status updates
544 my $lastmt = PMG
::DBTools
::read_int_clusterinfo
($ldb, $rcid, 'lastmt_CMSReceivers');
546 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
547 $sth->execute($lastmt);
549 my $update_sth = $ldb->prepare(
550 "UPDATE CMSReceivers SET status = ? WHERE " .
551 "CMailstore_CID = ? AND CMailstore_RID = ? AND PMail = ?;");
552 while (my $ref = $sth->fetchrow_hashref()) {
553 $update_sth->execute($ref->{status
}, $ref->{cmailstore_cid
},
554 $ref->{cmailstore_rid
}, $ref->{pmail
});
557 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
569 sub sync_statistic_db
{
570 my ($ldb, $rdb, $ni) = @_;
572 my $rcid = $ni->{cid
};
574 my $maxmails = 100000;
582 PMG
::DBTools
::create_clusterinfo_default
(
583 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
585 do { # get new values
592 my $lastid = PMG
::DBTools
::read_int_clusterinfo
(
593 $ldb, $rcid, 'lastid_CStatistic');
597 my $sth = $rdb->prepare(
598 "SELECT * from CStatistic " .
599 "WHERE cid = ? AND rid > ? " .
600 "ORDER BY cid, rid LIMIT ?");
601 $sth->execute($rcid, $lastid, $maxcount);
606 $maxid = $ref->{rid
};
609 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
610 $count += PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CStatistic', $attrs, $callback);
615 $sth = $rdb->prepare(
616 "SELECT * from CReceivers WHERE " .
617 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
618 $sth->execute($rcid, $lastid, $maxid);
620 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
621 PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CReceivers', $attrs);
623 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastid_CStatistic', $maxid);
635 last if $mscount >= $maxmails;
637 } while ($count >= $maxcount);
640 sub sync_generic_mtime_db
{
641 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
643 my ($cnew, $cold) = (0, 0);
645 my $ctime = PMG
::DBTools
::get_remote_time
($rdb);
647 PMG
::DBTools
::create_clusterinfo_default
($ldb, $ni->{cid
}, "lastmt_$table", 0, undef);
649 my $lastmt = PMG
::DBTools
::read_int_clusterinfo
($ldb, $ni->{cid
}, "lastmt_$table");
651 my $sql_cmd = $selectfunc->($ctime, $lastmt);
653 my $sth = $rdb->prepare($sql_cmd);
658 # use transaction to speedup things
659 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
662 while (my $ref = $sth->fetchrow_hashref()) {
663 if (++$count >= $max) {
668 $mergefunc->($ldb, $ref, \
$cnew, \
$cold);
676 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $ni->{cid
}, "lastmt_$table", $ctime);
678 return ($cnew, $cold);
681 sub sync_greylist_db
{
682 my ($dbh, $rdb, $ni) = @_;
684 my $selectfunc = sub {
685 my ($ctime, $lastmt) = @_;
686 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
687 "mtime >= $lastmt AND CID != 0";
690 my $merge_sth = $dbh->prepare(
691 "SELECT merge_greylist(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS newcount");
693 my $mergefunc = sub {
694 my ($ldb, $ref, $cnewref, $coldref) = @_;
697 $ref->{ipnet
}, $ref->{host
}, $ref->{sender
}, $ref->{receiver
},
698 $ref->{instance
}, $ref->{rctime
}, $ref->{extime
}, $ref->{delay
},
699 $ref->{blocked
}, $ref->{passed
}, 0, $ref->{cid
});
701 my $res = $merge_sth->fetchrow_hashref();
703 $merge_sth->finish();
705 if ($res->{newcount
}) {
712 return sync_generic_mtime_db
($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
715 sub sync_userprefs_db
{
716 my ($dbh, $rdb, $ni) = @_;
718 my $selectfunc = sub {
719 my ($ctime, $lastmt) = @_;
721 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
724 my $merge_sth = $dbh->prepare(
725 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
726 'VALUES ($1, $2, $3, 0) ' .
727 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
728 'MTime = 0, ' . # this is just a copy from somewhere else, not modified
729 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE $3 END');
731 my $mergefunc = sub {
732 my ($ldb, $ref, $cnewref, $coldref) = @_;
734 $merge_sth->execute($ref->{pmail
}, $ref->{name
}, $ref->{data
});
738 return sync_generic_mtime_db
($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
741 sub sync_domainstat_db
{
742 my ($dbh, $rdb, $ni) = @_;
744 my $selectfunc = sub {
745 my ($ctime, $lastmt) = @_;
746 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
749 my $mergefunc = sub {
750 my ($ldb, $ref, $cnewref, $coldref) = @_;
752 my $sth = $ldb->prepare (
753 "UPDATE Domainstat " .
754 "SET CountIn = $ref->{countin}, CountOut = $ref->{countout}, " .
755 "BytesIn = $ref->{bytesin}, BytesOut = $ref->{bytesout}, " .
756 "VirusIn = $ref->{virusin}, VirusOut = $ref->{virusout}, " .
757 "SpamIn = $ref->{spamin}, SpamOut = $ref->{spamout}, " .
758 "BouncesIn = $ref->{bouncesin}, BouncesOut = $ref->{bouncesout}, " .
759 "PTimeSum = $ref->{ptimesum}, MTime = $ref->{mtime} " .
760 "WHERE Time = ? AND Domain = ?");
762 $sth->execute($ref->{time}, $ref->{domain
});
764 my $rows = $sth->rows;
772 "INSERT INTO Domainstat " .
773 "(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut," .
774 "BouncesIn,BouncesOut,PTimeSum,Mtime) " .
775 "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", undef,
776 $ref->{time}, $ref->{domain
}, $ref->{countin
}, $ref->{countout
},
777 $ref->{bytesin
}, $ref->{bytesout
},
778 $ref->{virusin
}, $ref->{virusout
}, $ref->{spamin
}, $ref->{spamout
},
779 $ref->{bouncesin
}, $ref->{bouncesout
}, $ref->{ptimesum
}, $ref->{mtime
});
784 return sync_generic_mtime_db
($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
787 sub sync_dailystat_db
{
788 my ($dbh, $rdb, $ni) = @_;
790 my $selectfunc = sub {
791 my ($ctime, $lastmt) = @_;
792 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
795 my $mergefunc = sub {
796 my ($ldb, $ref, $cnewref, $coldref) = @_;
798 my $sth = $ldb->prepare(
799 "UPDATE DailyStat " .
800 "SET CountIn = $ref->{countin}, CountOut = $ref->{countout}, " .
801 "BytesIn = $ref->{bytesin}, BytesOut = $ref->{bytesout}, " .
802 "VirusIn = $ref->{virusin}, VirusOut = $ref->{virusout}, " .
803 "SpamIn = $ref->{spamin}, SpamOut = $ref->{spamout}, " .
804 "BouncesIn = $ref->{bouncesin}, BouncesOut = $ref->{bouncesout}, " .
805 "GreylistCount = $ref->{greylistcount}, SPFCount = $ref->{spfcount}, " .
806 "RBLCount = $ref->{rblcount}, " .
807 "PTimeSum = $ref->{ptimesum}, MTime = $ref->{mtime} " .
810 $sth->execute($ref->{time});
812 my $rows = $sth->rows;
820 "INSERT INTO DailyStat " .
821 "(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut," .
822 "BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) " .
823 "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", undef,
824 $ref->{time}, $ref->{countin
}, $ref->{countout
},
825 $ref->{bytesin
}, $ref->{bytesout
},
826 $ref->{virusin
}, $ref->{virusout
}, $ref->{spamin
}, $ref->{spamout
},
827 $ref->{bouncesin
}, $ref->{bouncesout
}, $ref->{greylistcount
},
828 $ref->{spfcount
}, $ref->{rblcount
}, $ref->{ptimesum
}, $ref->{mtime
});
833 return sync_generic_mtime_db
($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
836 sub sync_virusinfo_db
{
837 my ($dbh, $rdb, $ni) = @_;
839 my $selectfunc = sub {
840 my ($ctime, $lastmt) = @_;
841 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
844 my $mergefunc = sub {
845 my ($ldb, $ref, $cnewref, $coldref) = @_;
847 my $sth = $ldb->prepare(
848 "UPDATE VirusInfo SET Count = ? , MTime = ? " .
849 "WHERE Time = ? AND Name = ?");
851 $sth->execute($ref->{count
}, $ref->{mtime
}, $ref->{time}, $ref->{name
});
853 my $rows = $sth->rows;
860 $ldb->do ("INSERT INTO VirusInfo (Time,Name,Count,MTime) " .
861 "VALUES (?,?,?,?)", undef,
862 $ref->{time}, $ref->{name
}, $ref->{count
}, $ref->{mtime
});
867 return sync_generic_mtime_db
($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
870 sub sync_deleted_nodes_from_master
{
871 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
875 my $cid_hash = {}; # fast lookup
876 foreach my $ni (values %{$cinfo->{ids
}}) {
877 $cid_hash->{$ni->{cid
}} = $ni;
880 my $spooldir = $PMG::MailQueue
::spooldir
;
882 my $maxcid = $cinfo->{master
}->{maxcid
} // 0;
884 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
885 next if $cid_hash->{$rcid};
887 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
889 next if -f
$done_marker; # already synced
891 syslog
('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
893 my $starttime = [ gettimeofday
() ];
894 sync_spooldir
($masterni->{ip
}, $masterni->{name
}, $rcid);
895 $$rsynctime_ref += tv_interval
($starttime);
898 ip
=> $masterni->{ip
},
899 name
=> $masterni->{name
},
903 sync_quarantine_db
($ldb, $masterdb, $fake_ni);
905 sync_statistic_db
($ldb, $masterdb, $fake_ni);
907 open(my $fh, ">>", $done_marker);