use Data::Dumper;
use Socket;
use File::Path;
+use Time::HiRes qw (gettimeofday tv_interval);
use PVE::SafeSyslog;
use PVE::Tools;
use PVE::INotify;
+use PVE::APIClient::LWP;
use PMG::Utils;
use PMG::Config;
use PMG::ClusterConfig;
-
-our $spooldir = "/var/spool/proxmox";
-
-sub create_needed_dirs {
- my ($lcid, $cleanup) = @_;
-
- # if requested, remove any stale date
- File::Path::remove_tree("$spooldir/cluster", "$spooldir/virus", "$spooldir/spam") if $cleanup;
-
- mkdir "$spooldir/spam";
- mkdir "$spooldir/virus";
-
- if ($lcid) {
- mkpath "$spooldir/cluster/$lcid/virus";
- mkpath "$spooldir/cluster/$lcid/spam";
- }
-}
+use PMG::RuleDB;
+use PMG::RuleCache;
+use PMG::MailQueue;
+use PMG::Fetchmail;
sub remote_node_ip {
my ($nodename, $noerr) = @_;
sub update_ssh_keys {
my ($cinfo) = @_;
+ my $old = '';
my $data = '';
+
foreach my $node (values %{$cinfo->{ids}}) {
$data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
$data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
}
- PVE::Tools::file_set_contents($sshglobalknownhosts, $data);
+ $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
+ if -f $sshglobalknownhosts;
+
+ PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
+ if $old ne $data;
$data = '';
+ $old = '';
# always add ourself
if (-f $ssh_rsa_id) {
$newdata .= "$line\n";
}
- PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600);
+ $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
+ if -f $rootsshauthkeys;
+
+ PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
+ if $old ne $newdata;
}
my $cfgdir = '/etc/pmg';
return 0 if $new eq $old;
}
+ # set mtime (touch) to avoid time drift problems
+ utime(undef, undef, $srcfn);
+
rename($srcfn, $dstfn) ||
die "cond_rename_file '$filename' failed - $!\n";
return 1;
};
-sub sync_config_from_master {
- my ($cinfo, $master_ip, $noreload) = @_;
+my $rsync_command = sub {
+ my ($host_key_alias, @args) = @_;
- my $local_ip = $cinfo->{local}->{ip};
- my $local_name = $cinfo->{local}->{name};
+ my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
+ $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
- if ($local_ip eq $master_ip) {
- print STDERR "local node is master - nothing to do\n";
- return;
+ my $cmd = ['rsync', $ssh_cmd, '-q', @args];
+
+ return $cmd;
+};
+
+sub sync_quarantine_files {
+ my ($host_ip, $host_name, $flistname, $rcid) = @_;
+
+ my $spooldir = $PMG::MailQueue::spooldir;
+
+ mkdir "$spooldir/cluster/";
+ my $syncdir = "$spooldir/cluster/$rcid";
+ mkdir $syncdir;
+
+ my $cmd = $rsync_command->(
+ $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
+ '--files-from', $flistname);
+
+ PVE::Tools::run_command($cmd);
+}
+
+sub sync_spooldir {
+ my ($host_ip, $host_name, $rcid) = @_;
+
+ my $spooldir = $PMG::MailQueue::spooldir;
+
+ mkdir "$spooldir/cluster/";
+ my $syncdir = "$spooldir/cluster/$rcid";
+ mkdir $syncdir;
+
+ my $cmd = $rsync_command->(
+ $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
+
+ foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
+ push @$cmd, '--include', $incl;
}
+ push @$cmd, '--exclude', '*';
+
+ PVE::Tools::run_command($cmd);
+}
+
+sub sync_master_quar {
+ my ($host_ip, $host_name) = @_;
+
+ my $spooldir = $PMG::MailQueue::spooldir;
+
+ my $syncdir = "$spooldir/cluster/";
+ mkdir $syncdir;
+
+ my $cmd = $rsync_command->(
+ $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
+
+ PVE::Tools::run_command($cmd);
+}
+
+sub sync_config_from_master {
+ my ($master_name, $master_ip, $noreload) = @_;
+
mkdir $syncdir;
File::Path::remove_tree($syncdir, {keep_root => 1});
my $sa_conf_dir = "/etc/mail/spamassassin";
my $sa_custom_cf = "custom.cf";
- my $cmd = ['rsync', '--rsh=ssh -l root -o BatchMode=yes', '-lpgoq',
- "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
- "$syncdir/",
- '--exclude', '*~',
- '--exclude', '*.db',
- '--exclude', 'pmg-api.pem',
- '--exclude', 'pmg-tls.pem',
- ];
+ my $cmd = $rsync_command->(
+ $master_name, '-aq',
+ "[${master_ip}]:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
+ "$syncdir/",
+ '--exclude', 'master/',
+ '--exclude', '*~',
+ '--exclude', '*.db',
+ '--exclude', 'pmg-api.pem',
+ '--exclude', 'pmg-tls.pem',
+ );
my $errmsg = "syncing master configuration from '${master_ip}' failed";
PVE::Tools::run_command($cmd, errmsg => $errmsg);
# verify that the remote host is cluster master
open (my $fh, '<', "$syncdir/cluster.conf") ||
die "unable to open synced cluster.conf - $!\n";
- my $newcinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
- if (!$newcinfo->{master} || ($newcinfo->{master}->{ip} ne $master_ip)) {
+ my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
+
+ if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
die "host '$master_ip' is not cluster master\n";
}
- my $role = $newcinfo->{'local'}->{type} // '-';
- die "local node '$newcinfo->{local}->{name}' not part of cluster\n"
+ my $role = $cinfo->{'local'}->{type} // '-';
+ die "local node '$cinfo->{local}->{name}' not part of cluster\n"
if $role eq '-';
- die "local node '$newcinfo->{local}->{name}' is new cluster master\n"
+ die "local node '$cinfo->{local}->{name}' is new cluster master\n"
if $role eq 'master';
-
$cond_commit_synced_file->('cluster.conf');
- $cinfo = $newcinfo;
+
+ update_ssh_keys($cinfo); # rewrite ssh keys
+
+ PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
my $files = [
'pmg-authkey.key',
'pmg-csrf.key',
'ldap.conf',
'user.conf',
+ 'domains',
+ 'mynetworks',
+ 'transport',
+ 'tls_policy',
+ 'fetchmailrc',
];
foreach my $filename (@$files) {
$cond_commit_synced_file->($filename);
}
+
my $force_restart = {};
if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
$cond_commit_synced_file->('pmg.conf');
- my $cfg = PMG::Config->new();
+ # sync user templates files/symlinks (not recursive)
+ my $srcdir = "$syncdir/templates";
+ if (-d $srcdir) {
+ my $dstdir = "$cfgdir/templates";
+ mkdir $dstdir;
+ my $names_hash = {};
+ foreach my $fn (<$srcdir/*>) {
+ next if $fn !~ m|^($srcdir/(.*))$|;
+ $fn = $1; # untaint;
+ my $name = $2;
+ $names_hash->{$name} = 1;
+ my $target = "$dstdir/$name";
+ if (-f $fn) {
+ $cond_commit_synced_file->("templates/$name", $target);
+ } elsif (-l $fn) {
+ warn "update $target failed - $!\n" if !rename($fn, $target);
+ }
+ }
+ # remove vanished files
+ foreach my $fn (<$dstdir/*>) {
+ next if $fn !~ m|^($dstdir/(.*))$|;
+ $fn = $1; # untaint;
+ my $name = $2;
+ next if $names_hash->{$name};
+ warn "unlink $fn failed - $!\n" if !unlink($fn);
+ }
+ }
+}
+
+sub sync_ruledb_from_master {
+ my ($ldb, $rdb, $ni, $ticket) = @_;
+
+ my $ruledb = PMG::RuleDB->new($ldb);
+ my $rulecache = PMG::RuleCache->new($ruledb);
+
+ my $conn = PVE::APIClient::LWP->new(
+ ticket => $ticket,
+ cookie_name => 'PMGAuthCookie',
+ host => $ni->{ip},
+ cached_fingerprints => {
+ $ni->{fingerprint} => 1,
+ });
+
+ my $digest = $conn->get("/config/ruledb/digest", {});
+
+ return if $digest eq $rulecache->{digest}; # no changes
+
+ syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
+
+ eval {
+ $ldb->begin_work;
+
+ $ldb->do("DELETE FROM Rule");
+ $ldb->do("DELETE FROM RuleGroup");
+ $ldb->do("DELETE FROM ObjectGroup");
+ $ldb->do("DELETE FROM Object");
+ $ldb->do("DELETE FROM Attribut");
+
+ eval {
+ $rdb->begin_work;
+
+ # read a consistent snapshot
+ $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+
+ PMG::DBTools::copy_table($ldb, $rdb, "Rule");
+ PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
+ PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
+ PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
+ PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
+ };
+
+ $rdb->rollback; # end transaction
+
+ die $@ if $@;
+
+ # update sequences
+
+ $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
+ $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
+ $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
+
+ $ldb->commit;
+ };
+ if (my $err = $@) {
+ $ldb->rollback;
+ die $err;
+ }
+
+ syslog('info', "finished rule database sync from host '$ni->{ip}'");
+}
+
+sub sync_quarantine_db {
+ my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
+
+ my $rcid = $ni->{cid};
+
+ my $maxmails = 100000;
+
+ my $mscount = 0;
+
+ my $ctime = PMG::DBTools::get_remote_time($rdb);
+
+ my $maxcount = 1000;
+
+ my $count;
+
+ PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
+
+ do { # get new values
+
+ $count = 0;
+
+ my $flistname = "/tmp/quarantinefilelist.$$";
+
+ eval {
+ $ldb->begin_work;
+
+ open(my $flistfh, '>', $flistname) ||
+ die "unable to open file '$flistname' - $!\n";
+
+ my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
+
+ # sync CMailStore
+
+ my $sth = $rdb->prepare(
+ "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
+ "ORDER BY cid,rid LIMIT ?");
+ $sth->execute($rcid, $lastid, $maxcount);
+
+ my $maxid;
+ my $callback = sub {
+ my $ref = shift;
+ $maxid = $ref->{rid};
+ my $filename = $ref->{file};
+ # skip files generated before cluster was created
+ return if $filename !~ m!^cluster/!;
+ print $flistfh "$filename\n";
+ };
+
+ my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
+ $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
+
+ close($flistfh);
+
+ my $starttime = [ gettimeofday() ];
+ sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
+ $$rsynctime_ref += tv_interval($starttime);
+
+ if ($maxid) {
+ # sync CMSReceivers
+
+ $sth = $rdb->prepare(
+ "SELECT * from CMSReceivers WHERE " .
+ "CMailStore_CID = ? AND CMailStore_RID > ? " .
+ "AND CMailStore_RID <= ?");
+ $sth->execute($rcid, $lastid, $maxid);
+
+ $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
+ PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
+
+ PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
+ }
+
+ $ldb->commit;
+ };
+ my $err = $@;
+
+ unlink $flistname;
+
+ if ($err) {
+ $ldb->rollback;
+ die $err;
+ }
+
+ $mscount += $count;
+
+ } while (($count >= $maxcount) && ($mscount < $maxmails));
+
+ PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
+
+ eval { # synchronize status updates
+ $ldb->begin_work;
+
+ my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
+
+ my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
+ $sth->execute($lastmt);
+
+ my $update_sth = $ldb->prepare(
+ "UPDATE CMSReceivers SET status = ? WHERE " .
+ "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
+ while (my $ref = $sth->fetchrow_hashref()) {
+ $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
+ $ref->{cmailstore_rid}, $ref->{ticketid});
+ }
+
+ PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
+
+ $ldb->commit;
+ };
+ if (my $err = $@) {
+ $ldb->rollback;
+ die $err;
+ }
+
+ return $mscount;
+}
+
+sub sync_statistic_db {
+ my ($ldb, $rdb, $ni) = @_;
+
+ my $rcid = $ni->{cid};
+
+ my $maxmails = 100000;
+
+ my $mscount = 0;
+
+ my $maxcount = 1000;
+
+ my $count;
+
+ PMG::DBTools::create_clusterinfo_default(
+ $ldb, $rcid, 'lastid_CStatistic', -1, undef);
+
+ do { # get new values
+
+ $count = 0;
+
+ eval {
+ $ldb->begin_work;
+
+ my $lastid = PMG::DBTools::read_int_clusterinfo(
+ $ldb, $rcid, 'lastid_CStatistic');
+
+ # sync CStatistic
+
+ my $sth = $rdb->prepare(
+ "SELECT * from CStatistic " .
+ "WHERE cid = ? AND rid > ? " .
+ "ORDER BY cid, rid LIMIT ?");
+ $sth->execute($rcid, $lastid, $maxcount);
+
+ my $maxid;
+ my $callback = sub {
+ my $ref = shift;
+ $maxid = $ref->{rid};
+ };
+
+ my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
+ $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
+
+ if ($maxid) {
+ # sync CReceivers
+
+ $sth = $rdb->prepare(
+ "SELECT * from CReceivers WHERE " .
+ "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
+ $sth->execute($rcid, $lastid, $maxid);
+
+ $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
+ PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
+ }
+
+ PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
+
+ $ldb->commit;
+ };
+ if (my $err = $@) {
+ $ldb->rollback;
+ die $err;
+ }
+
+ $mscount += $count;
+
+ } while (($count >= $maxcount) && ($mscount < $maxmails));
+
+ return $mscount;
+}
+
+my $sync_generic_mtime_db = sub {
+ my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
+
+ my $ctime = PMG::DBTools::get_remote_time($rdb);
+
+ PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
+
+ my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
+
+ my $sql_cmd = $selectfunc->($ctime, $lastmt);
+
+ my $sth = $rdb->prepare($sql_cmd);
+
+ $sth->execute();
+
+ my $updates = 0;
+
+ eval {
+ # use transaction to speedup things
+ my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
+ my $count = 0;
+ while (my $ref = $sth->fetchrow_hashref()) {
+ $ldb->begin_work if !$count;
+ $mergefunc->($ref);
+ if (++$count >= $max) {
+ $count = 0;
+ $ldb->commit;
+ }
+ $updates++;
+ }
+
+ $ldb->commit if $count;
+ };
+ if (my $err = $@) {
+ $ldb->rollback;
+ die $err;
+ }
+
+ PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
+
+ return $updates;
+};
+
+sub sync_localstat_db {
+ my ($dbh, $rdb, $ni) = @_;
+
+ my $rcid = $ni->{cid};
+
+ my $selectfunc = sub {
+ my ($ctime, $lastmt) = @_;
+ return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
+ };
+
+ my $merge_sth = $dbh->prepare(
+ 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
+ 'VALUES (?, ?, ?, ?, ?) ' .
+ 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
+ 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
+
+ my $mergefunc = sub {
+ my ($ref) = @_;
+
+ $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
+ };
+
+ return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
+}
+
+sub sync_greylist_db {
+ my ($dbh, $rdb, $ni) = @_;
+
+ my $selectfunc = sub {
+ my ($ctime, $lastmt) = @_;
+ return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
+ "mtime >= $lastmt AND CID != 0";
+ };
+
+ my $merge_sth = $dbh->prepare($PMG::DBTools::cgreylist_merge_sql);
+ my $mergefunc = sub {
+ my ($ref) = @_;
+
+ $merge_sth->execute(
+ $ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver},
+ $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
+ $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
+ };
+
+ return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
+}
+
+sub sync_userprefs_db {
+ my ($dbh, $rdb, $ni) = @_;
+
+ my $selectfunc = sub {
+ my ($ctime, $lastmt) = @_;
+
+ return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
+ };
+
+ my $merge_sth = $dbh->prepare(
+ "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
+ 'VALUES (?, ?, ?, ?) ' .
+ 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
+ # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
+ 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
+ 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
+
+ my $mergefunc = sub {
+ my ($ref) = @_;
+
+ $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
+ };
+
+ return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
+}
+
+sub sync_domainstat_db {
+ my ($dbh, $rdb, $ni) = @_;
+
+ my $selectfunc = sub {
+ my ($ctime, $lastmt) = @_;
+ return "SELECT * from DomainStat WHERE mtime >= $lastmt";
+ };
+
+ my $merge_sth = $dbh->prepare(
+ 'INSERT INTO Domainstat ' .
+ '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
+ 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
+ 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
+ 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
+ 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
+ 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
+ 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
+ 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
+ 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
+ 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
+
+ my $mergefunc = sub {
+ my ($ref) = @_;
+
+ $merge_sth->execute(
+ $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
+ $ref->{bytesin}, $ref->{bytesout},
+ $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
+ $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
+ };
+
+ return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
+}
+
+sub sync_dailystat_db {
+ my ($dbh, $rdb, $ni) = @_;
+
+ my $selectfunc = sub {
+ my ($ctime, $lastmt) = @_;
+ return "SELECT * from DailyStat WHERE mtime >= $lastmt";
+ };
+
+ my $merge_sth = $dbh->prepare(
+ 'INSERT INTO DailyStat ' .
+ '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
+ 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
+ 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
+ 'ON CONFLICT (Time) DO UPDATE SET ' .
+ 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
+ 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
+ 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
+ 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
+ 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
+ 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
+ 'RBLCount = excluded.RBLCount, ' .
+ 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
+
+ my $mergefunc = sub {
+ my ($ref) = @_;
+
+ $merge_sth->execute(
+ $ref->{time}, $ref->{countin}, $ref->{countout},
+ $ref->{bytesin}, $ref->{bytesout},
+ $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
+ $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
+ $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
+ };
+
+ return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
+}
+
+sub sync_virusinfo_db {
+ my ($dbh, $rdb, $ni) = @_;
+
+ my $selectfunc = sub {
+ my ($ctime, $lastmt) = @_;
+ return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
+ };
+
+ my $merge_sth = $dbh->prepare(
+ 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
+ 'VALUES (?,?,?,?) ' .
+ 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
+ 'Count = excluded.Count , MTime = excluded.MTime');
+
+ my $mergefunc = sub {
+ my ($ref) = @_;
+
+ $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
+ };
+
+ return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
+}
+
+sub sync_deleted_nodes_from_master {
+ my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
- $cfg->rewrite_config(1, $force_restart);
+ my $rsynctime = 0;
+
+ my $cid_hash = {}; # fast lookup
+ foreach my $ni (values %{$cinfo->{ids}}) {
+ $cid_hash->{$ni->{cid}} = $ni;
+ }
+
+ my $spooldir = $PMG::MailQueue::spooldir;
+
+ my $maxcid = $cinfo->{master}->{maxcid} // 0;
+
+ for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
+ next if $cid_hash->{$rcid};
+
+ my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
+
+ next if -f $done_marker; # already synced
+
+ syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
+
+ my $starttime = [ gettimeofday() ];
+ sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
+ $$rsynctime_ref += tv_interval($starttime);
+
+ my $fake_ni = {
+ ip => $masterni->{ip},
+ name => $masterni->{name},
+ cid => $rcid,
+ };
+
+ sync_quarantine_db($ldb, $masterdb, $fake_ni);
+
+ sync_statistic_db ($ldb, $masterdb, $fake_ni);
+
+ open(my $fh, ">>", $done_marker);
+ }
}
+
1;