8 use Time
::HiRes qw
(gettimeofday tv_interval
);
13 use PVE
::APIClient
::LWP
;
17 use PMG
::ClusterConfig
;
24 my ($nodename, $noerr) = @_;
26 my $cinfo = PMG
::ClusterConfig-
>new();
28 foreach my $entry (values %{$cinfo->{ids
}}) {
29 if ($entry->{name
} eq $nodename) {
30 my $ip = $entry->{ip
};
31 return $ip if !wantarray;
32 my $family = PVE
::Tools
::get_host_address_family
($ip);
33 return ($ip, $family);
37 # fallback: try to get IP by other means
38 return PMG
::Utils
::lookup_node_ip
($nodename, $noerr);
44 $cinfo = PMG
::ClusterConfig-
>new() if !$cinfo;
46 return $cinfo->{master
}->{name
} if defined($cinfo->{master
});
51 sub read_local_ssl_cert_fingerprint
{
52 my $cert_path = "/etc/pmg/pmg-api.pem";
56 my $bio = Net
::SSLeay
::BIO_new_file
($cert_path, 'r');
57 $cert = Net
::SSLeay
::PEM_read_bio_X509
($bio);
58 Net
::SSLeay
::BIO_free
($bio);
61 die "unable to read certificate '$cert_path' - $err\n";
64 if (!defined($cert)) {
65 die "unable to read certificate '$cert_path' - got empty value\n";
70 $fp = Net
::SSLeay
::X509_get_fingerprint
($cert, 'sha256');
73 die "unable to get fingerprint for '$cert_path' - $err\n";
76 if (!defined($fp) || $fp eq '') {
77 die "unable to get fingerprint for '$cert_path' - got empty value\n";
83 my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
84 my $rootrsakey_fn = '/root/.ssh/id_rsa';
85 my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
87 sub read_local_cluster_info
{
91 my $hostrsapubkey = PVE
::Tools
::file_read_firstline
($hostrsapubkey_fn);
92 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
93 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
95 die "unable to parse ${hostrsapubkey_fn}\n"
96 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\
+]{200,}$/;
98 my $nodename = PVE
::INotify
::nodename
();
100 $res->{name
} = $nodename;
102 $res->{ip
} = PMG
::Utils
::lookup_node_ip
($nodename);
104 $res->{hostrsapubkey
} = $hostrsapubkey;
106 if (! -f
$rootrsapubkey_fn) {
107 unlink $rootrsakey_fn;
108 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
109 '-f', $rootrsakey_fn];
110 PMG
::Utils
::run_silent_cmd
($cmd);
113 my $rootrsapubkey = PVE
::Tools
::file_read_firstline
($rootrsapubkey_fn);
114 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
115 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
117 die "unable to parse ${rootrsapubkey_fn}\n"
118 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\
+]{200,}$/;
120 $res->{rootrsapubkey
} = $rootrsapubkey;
122 $res->{fingerprint
} = read_local_ssl_cert_fingerprint
();
127 # X509 Certificate cache helper
129 my $cert_cache_nodes = {};
130 my $cert_cache_timestamp = time();
131 my $cert_cache_fingerprints = {};
133 sub update_cert_cache
{
135 $cert_cache_timestamp = time();
137 $cert_cache_fingerprints = {};
138 $cert_cache_nodes = {};
140 my $cinfo = PMG
::ClusterConfig-
>new();
142 foreach my $entry (values %{$cinfo->{ids
}}) {
143 my $node = $entry->{name
};
144 my $fp = $entry->{fingerprint
};
146 $cert_cache_fingerprints->{$fp} = 1;
147 $cert_cache_nodes->{$node} = $fp;
152 # load and cache cert fingerprint once
153 sub initialize_cert_cache
{
157 if defined($node) && !defined($cert_cache_nodes->{$node});
160 sub check_cert_fingerprint
{
163 # clear cache every 30 minutes at least
164 update_cert_cache
() if time() - $cert_cache_timestamp >= 60*30;
166 # get fingerprint of server certificate
169 $fp = Net
::SSLeay
::X509_get_fingerprint
($cert, 'sha256');
171 return 0 if $@ || !defined($fp) || $fp eq ''; # error
174 for my $expected (keys %$cert_cache_fingerprints) {
175 return 1 if $fp eq $expected;
180 return 1 if $check->();
182 # clear cache and retry at most once every minute
183 if (time() - $cert_cache_timestamp >= 60) {
184 syslog
('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
192 my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
193 my $rootsshauthkeys = "/root/.ssh/authorized_keys";
194 my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
196 sub update_ssh_keys
{
202 foreach my $node (values %{$cinfo->{ids
}}) {
203 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
204 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
207 $old = PVE
::Tools
::file_get_contents
($sshglobalknownhosts, 1024*1024)
208 if -f
$sshglobalknownhosts;
210 PVE
::Tools
::file_set_contents
($sshglobalknownhosts, $data)
217 if (-f
$ssh_rsa_id) {
218 my $pub = PVE
::Tools
::file_get_contents
($ssh_rsa_id);
223 foreach my $node (values %{$cinfo->{ids
}}) {
224 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
227 if (-f
$rootsshauthkeys) {
228 my $mykey = PVE
::Tools
::file_get_contents
($rootsshauthkeys, 128*1024);
235 my @lines = split(/\n/, $data);
236 foreach my $line (@lines) {
237 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
238 next if $vhash->{$3}++;
240 $newdata .= "$line\n";
243 $old = PVE
::Tools
::file_get_contents
($rootsshauthkeys, 1024*1024)
244 if -f
$rootsshauthkeys;
246 PVE
::Tools
::file_set_contents
($rootsshauthkeys, $newdata, 0600)
250 my $cfgdir = '/etc/pmg';
251 my $syncdir = "$cfgdir/master";
253 my $cond_commit_synced_file = sub {
254 my ($filename, $dstfn) = @_;
256 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
257 my $srcfn = "$syncdir/$filename";
264 my $new = PVE
::Tools
::file_get_contents
($srcfn, 1024*1024);
267 my $old = PVE
::Tools
::file_get_contents
($dstfn, 1024*1024);
268 return 0 if $new eq $old;
271 # set mtime (touch) to avoid time drift problems
272 utime(undef, undef, $srcfn);
274 rename($srcfn, $dstfn) ||
275 die "cond_rename_file '$filename' failed - $!\n";
277 print STDERR
"updated $dstfn\n";
282 my $rsync_command = sub {
283 my ($host_key_alias, @args) = @_;
285 my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
286 $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
288 my $cmd = ['rsync', $ssh_cmd, '-q', @args];
293 sub sync_quarantine_files
{
294 my ($host_ip, $host_name, $flistname, $rcid) = @_;
296 my $spooldir = $PMG::MailQueue
::spooldir
;
298 mkdir "$spooldir/cluster/";
299 my $syncdir = "$spooldir/cluster/$rcid";
302 my $cmd = $rsync_command->(
303 $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
304 '--files-from', $flistname);
306 PVE
::Tools
::run_command
($cmd);
310 my ($host_ip, $host_name, $rcid) = @_;
312 my $spooldir = $PMG::MailQueue
::spooldir
;
314 mkdir "$spooldir/cluster/";
315 my $syncdir = "$spooldir/cluster/$rcid";
318 my $cmd = $rsync_command->(
319 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
321 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
322 push @$cmd, '--include', $incl;
325 push @$cmd, '--exclude', '*';
327 PVE
::Tools
::run_command
($cmd);
330 sub sync_master_quar
{
331 my ($host_ip, $host_name) = @_;
333 my $spooldir = $PMG::MailQueue
::spooldir
;
335 my $syncdir = "$spooldir/cluster/";
338 my $cmd = $rsync_command->(
339 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
341 PVE
::Tools
::run_command
($cmd);
344 sub sync_config_from_master
{
345 my ($master_name, $master_ip, $noreload) = @_;
348 File
::Path
::remove_tree
($syncdir, {keep_root
=> 1});
350 my $sa_conf_dir = "/etc/mail/spamassassin";
351 my $sa_custom_cf = "custom.cf";
353 my $cmd = $rsync_command->(
355 "[${master_ip}]:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
357 '--exclude', 'master/',
360 '--exclude', 'pmg-api.pem',
361 '--exclude', 'pmg-tls.pem',
364 my $errmsg = "syncing master configuration from '${master_ip}' failed";
365 PVE
::Tools
::run_command
($cmd, errmsg
=> $errmsg);
367 # verify that the remote host is cluster master
368 open (my $fh, '<', "$syncdir/cluster.conf") ||
369 die "unable to open synced cluster.conf - $!\n";
371 my $cinfo = PMG
::ClusterConfig
::read_cluster_conf
('cluster.conf', $fh);
373 if (!$cinfo->{master
} || ($cinfo->{master
}->{ip
} ne $master_ip)) {
374 die "host '$master_ip' is not cluster master\n";
377 my $role = $cinfo->{'local'}->{type
} // '-';
378 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
381 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
382 if $role eq 'master';
384 $cond_commit_synced_file->('cluster.conf');
386 update_ssh_keys
($cinfo); # rewrite ssh keys
388 PMG
::Fetchmail
::update_fetchmail_default
(0); # disable on slave
403 foreach my $filename (@$files) {
404 $cond_commit_synced_file->($filename);
408 my $force_restart = {};
410 if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
411 $force_restart->{spam
} = 1;
414 $cond_commit_synced_file->('pmg.conf');
416 # sync user templates files/symlinks (not recursive)
417 my $srcdir = "$syncdir/templates";
419 my $dstdir = "$cfgdir/templates";
422 foreach my $fn (<$srcdir/*>) {
423 next if $fn !~ m
|^($srcdir/(.*))$|;
426 $names_hash->{$name} = 1;
427 my $target = "$dstdir/$name";
429 $cond_commit_synced_file->("templates/$name", $target);
431 warn "update $target failed - $!\n" if !rename($fn, $target);
434 # remove vanished files
435 foreach my $fn (<$dstdir/*>) {
436 next if $fn !~ m
|^($dstdir/(.*))$|;
439 next if $names_hash->{$name};
440 warn "unlink $fn failed - $!\n" if !unlink($fn);
445 sub sync_ruledb_from_master
{
446 my ($ldb, $rdb, $ni, $ticket) = @_;
448 my $ruledb = PMG
::RuleDB-
>new($ldb);
449 my $rulecache = PMG
::RuleCache-
>new($ruledb);
451 my $conn = PVE
::APIClient
::LWP-
>new(
453 cookie_name
=> 'PMGAuthCookie',
455 cached_fingerprints
=> {
456 $ni->{fingerprint
} => 1,
459 my $digest = $conn->get("/config/ruledb/digest", {});
461 return if $digest eq $rulecache->{digest
}; # no changes
463 syslog
('info', "detected rule database changes - starting sync from '$ni->{ip}'");
468 $ldb->do("DELETE FROM Rule");
469 $ldb->do("DELETE FROM RuleGroup");
470 $ldb->do("DELETE FROM ObjectGroup");
471 $ldb->do("DELETE FROM Object");
472 $ldb->do("DELETE FROM Attribut");
477 # read a consistent snapshot
478 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
480 PMG
::DBTools
::copy_table
($ldb, $rdb, "Rule");
481 PMG
::DBTools
::copy_table
($ldb, $rdb, "RuleGroup");
482 PMG
::DBTools
::copy_table
($ldb, $rdb, "ObjectGroup");
483 PMG
::DBTools
::copy_table
($ldb, $rdb, "Object", 'value');
484 PMG
::DBTools
::copy_table
($ldb, $rdb, "Attribut", 'value');
487 $rdb->rollback; # end transaction
493 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
494 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
495 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
504 syslog
('info', "finished rule database sync from host '$ni->{ip}'");
507 sub sync_quarantine_db
{
508 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
510 my $rcid = $ni->{cid
};
512 my $maxmails = 100000;
516 my $ctime = PMG
::DBTools
::get_remote_time
($rdb);
522 PMG
::DBTools
::create_clusterinfo_default
($ldb, $rcid, 'lastid_CMailStore', -1, undef);
524 do { # get new values
528 my $flistname = "/tmp/quarantinefilelist.$$";
533 open(my $flistfh, '>', $flistname) ||
534 die "unable to open file '$flistname' - $!\n";
536 my $lastid = PMG
::DBTools
::read_int_clusterinfo
($ldb, $rcid, 'lastid_CMailStore');
540 my $sth = $rdb->prepare(
541 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
542 "ORDER BY cid,rid LIMIT ?");
543 $sth->execute($rcid, $lastid, $maxcount);
548 $maxid = $ref->{rid
};
549 my $filename = $ref->{file
};
550 # skip files generated before cluster was created
551 return if $filename !~ m!^cluster/!;
552 print $flistfh "$filename\n";
555 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
556 $count += PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CMailStore', $attrs, $callback);
560 my $starttime = [ gettimeofday
() ];
561 sync_quarantine_files
($ni->{ip
}, $ni->{name
}, $flistname, $rcid);
562 $$rsynctime_ref += tv_interval
($starttime);
567 $sth = $rdb->prepare(
568 "SELECT * from CMSReceivers WHERE " .
569 "CMailStore_CID = ? AND CMailStore_RID > ? " .
570 "AND CMailStore_RID <= ?");
571 $sth->execute($rcid, $lastid, $maxid);
573 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
574 PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CMSReceivers', $attrs);
576 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastid_CMailStore', $maxid);
592 } while (($count >= $maxcount) && ($mscount < $maxmails));
594 PMG
::DBTools
::create_clusterinfo_default
($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
596 eval { # synchronize status updates
599 my $lastmt = PMG
::DBTools
::read_int_clusterinfo
($ldb, $rcid, 'lastmt_CMSReceivers');
601 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
602 $sth->execute($lastmt);
604 my $update_sth = $ldb->prepare(
605 "UPDATE CMSReceivers SET status = ? WHERE " .
606 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
607 while (my $ref = $sth->fetchrow_hashref()) {
608 $update_sth->execute($ref->{status
}, $ref->{cmailstore_cid
},
609 $ref->{cmailstore_rid
}, $ref->{ticketid
});
612 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
624 sub sync_statistic_db
{
625 my ($ldb, $rdb, $ni) = @_;
627 my $rcid = $ni->{cid
};
629 my $maxmails = 100000;
637 PMG
::DBTools
::create_clusterinfo_default
(
638 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
640 do { # get new values
647 my $lastid = PMG
::DBTools
::read_int_clusterinfo
(
648 $ldb, $rcid, 'lastid_CStatistic');
652 my $sth = $rdb->prepare(
653 "SELECT * from CStatistic " .
654 "WHERE cid = ? AND rid > ? " .
655 "ORDER BY cid, rid LIMIT ?");
656 $sth->execute($rcid, $lastid, $maxcount);
661 $maxid = $ref->{rid
};
664 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
665 $count += PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CStatistic', $attrs, $callback);
670 $sth = $rdb->prepare(
671 "SELECT * from CReceivers WHERE " .
672 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
673 $sth->execute($rcid, $lastid, $maxid);
675 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
676 PMG
::DBTools
::copy_selected_data
($ldb, $sth, 'CReceivers', $attrs);
679 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $rcid, 'lastid_CStatistic', $maxid);
690 } while (($count >= $maxcount) && ($mscount < $maxmails));
695 my $sync_generic_mtime_db = sub {
696 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
698 my $ctime = PMG
::DBTools
::get_remote_time
($rdb);
700 PMG
::DBTools
::create_clusterinfo_default
($ldb, $ni->{cid
}, "lastmt_$table", 0, undef);
702 my $lastmt = PMG
::DBTools
::read_int_clusterinfo
($ldb, $ni->{cid
}, "lastmt_$table");
704 my $sql_cmd = $selectfunc->($ctime, $lastmt);
706 my $sth = $rdb->prepare($sql_cmd);
713 # use transaction to speedup things
714 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
716 while (my $ref = $sth->fetchrow_hashref()) {
717 $ldb->begin_work if !$count;
719 if (++$count >= $max) {
726 $ldb->commit if $count;
733 PMG
::DBTools
::write_maxint_clusterinfo
($ldb, $ni->{cid
}, "lastmt_$table", $ctime);
738 sub sync_localstat_db
{
739 my ($dbh, $rdb, $ni) = @_;
741 my $rcid = $ni->{cid
};
743 my $selectfunc = sub {
744 my ($ctime, $lastmt) = @_;
745 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
748 my $merge_sth = $dbh->prepare(
749 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
750 'VALUES (?, ?, ?, ?, ?) ' .
751 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
752 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
754 my $mergefunc = sub {
757 $merge_sth->execute($ref->{time}, $ref->{rblcount
}, $ref->{pregreetcount
}, $ref->{cid
}, $ref->{mtime
});
760 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
763 sub sync_greylist_db
{
764 my ($dbh, $rdb, $ni) = @_;
766 my $selectfunc = sub {
767 my ($ctime, $lastmt) = @_;
768 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
769 "mtime >= $lastmt AND CID != 0";
772 my $merge_sth = $dbh->prepare($PMG::DBTools
::cgreylist_merge_sql
);
773 my $mergefunc = sub {
777 $ref->{ipnet
}, $ref->{host
}, $ref->{sender
}, $ref->{receiver
},
778 $ref->{instance
}, $ref->{rctime
}, $ref->{extime
}, $ref->{delay
},
779 $ref->{blocked
}, $ref->{passed
}, 0, $ref->{cid
});
782 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
785 sub sync_userprefs_db
{
786 my ($dbh, $rdb, $ni) = @_;
788 my $selectfunc = sub {
789 my ($ctime, $lastmt) = @_;
791 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
794 my $merge_sth = $dbh->prepare(
795 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
796 'VALUES (?, ?, ?, ?) ' .
797 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
798 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
799 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
800 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
802 my $mergefunc = sub {
805 $merge_sth->execute($ref->{pmail
}, $ref->{name
}, $ref->{data
}, $ref->{mtime
});
808 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
811 sub sync_domainstat_db
{
812 my ($dbh, $rdb, $ni) = @_;
814 my $selectfunc = sub {
815 my ($ctime, $lastmt) = @_;
816 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
819 my $merge_sth = $dbh->prepare(
820 'INSERT INTO Domainstat ' .
821 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
822 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
823 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
824 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
825 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
826 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
827 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
828 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
829 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
830 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
832 my $mergefunc = sub {
836 $ref->{time}, $ref->{domain
}, $ref->{countin
}, $ref->{countout
},
837 $ref->{bytesin
}, $ref->{bytesout
},
838 $ref->{virusin
}, $ref->{virusout
}, $ref->{spamin
}, $ref->{spamout
},
839 $ref->{bouncesin
}, $ref->{bouncesout
}, $ref->{ptimesum
}, $ref->{mtime
});
842 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
845 sub sync_dailystat_db
{
846 my ($dbh, $rdb, $ni) = @_;
848 my $selectfunc = sub {
849 my ($ctime, $lastmt) = @_;
850 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
853 my $merge_sth = $dbh->prepare(
854 'INSERT INTO DailyStat ' .
855 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
856 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
857 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
858 'ON CONFLICT (Time) DO UPDATE SET ' .
859 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
860 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
861 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
862 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
863 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
864 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
865 'RBLCount = excluded.RBLCount, ' .
866 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
868 my $mergefunc = sub {
872 $ref->{time}, $ref->{countin
}, $ref->{countout
},
873 $ref->{bytesin
}, $ref->{bytesout
},
874 $ref->{virusin
}, $ref->{virusout
}, $ref->{spamin
}, $ref->{spamout
},
875 $ref->{bouncesin
}, $ref->{bouncesout
}, $ref->{greylistcount
},
876 $ref->{spfcount
}, $ref->{rblcount
}, $ref->{ptimesum
}, $ref->{mtime
});
879 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
882 sub sync_virusinfo_db
{
883 my ($dbh, $rdb, $ni) = @_;
885 my $selectfunc = sub {
886 my ($ctime, $lastmt) = @_;
887 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
890 my $merge_sth = $dbh->prepare(
891 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
892 'VALUES (?,?,?,?) ' .
893 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
894 'Count = excluded.Count , MTime = excluded.MTime');
896 my $mergefunc = sub {
899 $merge_sth->execute($ref->{time}, $ref->{name
}, $ref->{count
}, $ref->{mtime
});
902 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
905 sub sync_deleted_nodes_from_master
{
906 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
910 my $cid_hash = {}; # fast lookup
911 foreach my $ni (values %{$cinfo->{ids
}}) {
912 $cid_hash->{$ni->{cid
}} = $ni;
915 my $spooldir = $PMG::MailQueue
::spooldir
;
917 my $maxcid = $cinfo->{master
}->{maxcid
} // 0;
919 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
920 next if $cid_hash->{$rcid};
922 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
924 next if -f
$done_marker; # already synced
926 syslog
('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
928 my $starttime = [ gettimeofday
() ];
929 sync_spooldir
($masterni->{ip
}, $masterni->{name
}, $rcid);
930 $$rsynctime_ref += tv_interval
($starttime);
933 ip
=> $masterni->{ip
},
934 name
=> $masterni->{name
},
938 sync_quarantine_db
($ldb, $masterdb, $fake_ni);
940 sync_statistic_db
($ldb, $masterdb, $fake_ni);
942 open(my $fh, ">>", $done_marker);