]> git.proxmox.com Git - pmg-api.git/blobdiff - PMG/Cluster.pm
do not use 'last' inside do/while
[pmg-api.git] / PMG / Cluster.pm
index f9801be75ba7a735af6939ff9a2a4ddbc6ac349e..9e456f16e4b9f3690b8681f424a7e0ec2de1bc9f 100644 (file)
@@ -2,43 +2,30 @@ package PMG::Cluster;
 
 use strict;
 use warnings;
-
+use Data::Dumper;
 use Socket;
+use File::Path;
+use Time::HiRes qw (gettimeofday tv_interval);
+
+use PVE::SafeSyslog;
 use PVE::Tools;
 use PVE::INotify;
+use PVE::APIClient::LWP;
 
-# this is also used to get the IP of the local node
-sub lookup_node_ip {
-    my ($nodename, $noerr) = @_;
-
-    my ($family, $packed_ip);
-
-    eval {
-       my @res = PVE::Tools::getaddrinfo_all($nodename);
-       $family = $res[0]->{family};
-       $packed_ip = (PVE::Tools::unpack_sockaddr_in46($res[0]->{addr}))[2];
-    };
-
-    if ($@) {
-       die "hostname lookup failed:\n$@" if !$noerr;
-       return undef;
-    }
-
-    my $ip = Socket::inet_ntop($family, $packed_ip);
-    if ($ip =~ m/^127\.|^::1$/) {
-       die "hostname lookup failed - got local IP address ($nodename = $ip)\n" if !$noerr;
-       return undef;
-    }
-
-    return wantarray ? ($ip, $family) : $ip;
-}
+use PMG::Utils;
+use PMG::Config;
+use PMG::ClusterConfig;
+use PMG::RuleDB;
+use PMG::RuleCache;
+use PMG::MailQueue;
+use PMG::Fetchmail;
 
 sub remote_node_ip {
     my ($nodename, $noerr) = @_;
 
-    my $cinfo = PVE::INotify::read_file("cluster.conf");
+    my $cinfo = PMG::ClusterConfig->new();
 
-    foreach my $entry (@{$cinfo->{nodes}}) {
+    foreach my $entry (values %{$cinfo->{ids}}) {
        if ($entry->{name} eq $nodename) {
            my $ip = $entry->{ip};
            return $ip if !wantarray;
@@ -48,85 +35,116 @@ sub remote_node_ip {
     }
 
     # fallback: try to get IP by other means
-    return lookup_node_ip($nodename, $noerr);
+    return PMG::Utils::lookup_node_ip($nodename, $noerr);
 }
 
 sub get_master_node {
     my ($cinfo) = @_;
 
-    my $cinfo //= PVE::INotify::read_file("cluster.conf");
+    $cinfo = PMG::ClusterConfig->new() if !$cinfo;
 
     return $cinfo->{master}->{name} if defined($cinfo->{master});
 
     return 'localhost';
 }
 
-# X509 Certificate cache helper
+sub read_local_ssl_cert_fingerprint {
+    my $cert_path = "/etc/pmg/pmg-api.pem";
 
-my $cert_cache_nodes = {};
-my $cert_cache_timestamp = time();
-my $cert_cache_fingerprints = {};
+    my $cert;
+    eval {
+       my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
+       $cert = Net::SSLeay::PEM_read_bio_X509($bio);
+       Net::SSLeay::BIO_free($bio);
+    };
+    if (my $err = $@) {
+       die "unable to read certificate '$cert_path' - $err\n";
+    }
 
-sub update_cert_cache {
-    my ($update_node, $clear) = @_;
+    if (!defined($cert)) {
+       die "unable to read certificate '$cert_path' - got empty value\n";
+    }
 
-    syslog('info', "Clearing outdated entries from certificate cache")
-       if $clear;
+    my $fp;
+    eval {
+       $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
+    };
+    if (my $err = $@) {
+       die "unable to get fingerprint for '$cert_path' - $err\n";
+    }
 
-    $cert_cache_timestamp = time() if !defined($update_node);
+    if (!defined($fp) || $fp eq '') {
+       die "unable to get fingerprint for '$cert_path' - got empty value\n";
+    }
 
-    my $node_list = defined($update_node) ?
-       [ $update_node ] : [ keys %$cert_cache_nodes ];
+    return $fp;
+}
 
-    my $clear_node = sub {
-       my ($node) = @_;
-       if (my $old_fp = $cert_cache_nodes->{$node}) {
-           # distrust old fingerprint
-           delete $cert_cache_fingerprints->{$old_fp};
-           # ensure reload on next proxied request
-           delete $cert_cache_nodes->{$node};
-       }
-    };
+my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
+my $rootrsakey_fn = '/root/.ssh/id_rsa';
+my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
+
+sub read_local_cluster_info {
+
+    my $res = {};
+
+    my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
+    $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
+    $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
+
+    die "unable to parse ${hostrsapubkey_fn}\n"
+       if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
 
     my $nodename = PVE::INotify::nodename();
 
-    foreach my $node (@$node_list) {
+    $res->{name} = $nodename;
 
-       if ($node ne $nodename) {
-           &$clear_node($node) if $clear;
-           next;
-       }
+    $res->{ip} = PMG::Utils::lookup_node_ip($nodename);
 
-       my $cert_path = "/etc/proxmox/pmg-api.pem";
+    $res->{hostrsapubkey} = $hostrsapubkey;
 
-       my $cert;
-       eval {
-           my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
-           $cert = Net::SSLeay::PEM_read_bio_X509($bio);
-           Net::SSLeay::BIO_free($bio);
-       };
-       my $err = $@;
-       if ($err || !defined($cert)) {
-           &$clear_node($node) if $clear;
-           next;
-       }
+    if (! -f $rootrsapubkey_fn) {
+       unlink $rootrsakey_fn;
+       my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
+                  '-f', $rootrsakey_fn];
+       PMG::Utils::run_silent_cmd($cmd);
+    }
 
-       my $fp;
-       eval {
-           $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
-       };
-       $err = $@;
-       if ($err || !defined($fp) || $fp eq '') {
-           &$clear_node($node) if $clear;
-           next;
-       }
+    my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
+    $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
+    $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
+
+    die "unable to parse ${rootrsapubkey_fn}\n"
+       if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
+
+    $res->{rootrsapubkey} = $rootrsapubkey;
+
+    $res->{fingerprint} = read_local_ssl_cert_fingerprint();
+
+    return $res;
+}
+
+# X509 Certificate cache helper
+
+my $cert_cache_nodes = {};
+my $cert_cache_timestamp = time();
+my $cert_cache_fingerprints = {};
+
+sub update_cert_cache {
 
-       my $old_fp = $cert_cache_nodes->{$node};
-       $cert_cache_fingerprints->{$fp} = 1;
-       $cert_cache_nodes->{$node} = $fp;
+    $cert_cache_timestamp = time();
 
-       if (defined($old_fp) && $fp ne $old_fp) {
-           delete $cert_cache_fingerprints->{$old_fp};
+    $cert_cache_fingerprints = {};
+    $cert_cache_nodes = {};
+
+    my $cinfo = PMG::ClusterConfig->new();
+
+    foreach my $entry (values %{$cinfo->{ids}}) {
+       my $node = $entry->{name};
+       my $fp = $entry->{fingerprint};
+       if ($node && $fp) {
+           $cert_cache_fingerprints->{$fp} = 1;
+           $cert_cache_nodes->{$node} = $fp;
        }
     }
 }
@@ -135,7 +153,7 @@ sub update_cert_cache {
 sub initialize_cert_cache {
     my ($node) = @_;
 
-    update_cert_cache($node)
+    update_cert_cache()
        if defined($node) && !defined($cert_cache_nodes->{$node});
 }
 
@@ -143,7 +161,7 @@ sub check_cert_fingerprint {
     my ($cert) = @_;
 
     # clear cache every 30 minutes at least
-    update_cert_cache(undef, 1) if time() - $cert_cache_timestamp >= 60*30;
+    update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
 
     # get fingerprint of server certificate
     my $fp;
@@ -159,177 +177,771 @@ sub check_cert_fingerprint {
        return 0;
     };
 
-    return 1 if &$check();
+    return 1 if $check->();
 
     # clear cache and retry at most once every minute
     if (time() - $cert_cache_timestamp >= 60) {
        syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
        update_cert_cache();
-       return &$check();
+       return $check->();
     }
 
     return 0;
 }
 
-sub read_cluster_conf {
-    my ($filename, $fh) = @_;
+my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
+my $rootsshauthkeys = "/root/.ssh/authorized_keys";
+my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
 
-    my $localname = PVE::INotify::nodename();
-    my $localip = lookup_node_ip($localname);
+sub update_ssh_keys {
+    my ($cinfo) = @_;
 
-    my $level = 0;
-    my $maxcid = 0;
+    my $old = '';
+    my $data = '';
 
-    my $cinfo;
+    foreach my $node (values %{$cinfo->{ids}}) {
+       $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
+       $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
+    }
 
-    $cinfo->{nodes} = [];
-    $cinfo->{remnodes} = [];
+    $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
+       if -f $sshglobalknownhosts;
 
-    $cinfo->{local} = {
-       role => '-',
-       cid => 0,
-       ip => $localip,
-       name => $localname,
-       configport => 83,
-       dbport => 5432,
-    };
+    PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
+       if $old ne $data;
 
-    # fixme: add test is local node is part of node list
-    if (defined($fh)) {
+    $data = '';
+    $old = '';
 
-       $cinfo->{exists} = 1; # cluster configuratin file exists and is readable
-
-       while (defined(my $line = <$fh>)) {
-           chomp $line;
+    # always add ourself
+    if (-f $ssh_rsa_id) {
+       my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
+       chomp($pub);
+       $data .= "$pub\n";
+    }
 
-           next if $line =~ m/^\s*$/; # skip empty lines
+    foreach my $node (values %{$cinfo->{ids}}) {
+       $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
+    }
 
-           if ($line =~ m/^maxcid\s+(\d+)\s*$/i) {
-               $maxcid = $1 > $maxcid ? $1 : $maxcid;
-               next;
-           }
+    if (-f $rootsshauthkeys) {
+       my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
+       chomp($mykey);
+       $data .= "$mykey\n";
+    }
 
-           if ($line =~ m/^(master|node)\s+(\d+)\s+\{\s*$/i) {
-               $level++;
-               my ($t, $cid) = (lc($1), $2);
-
-               $maxcid = $cid > $maxcid ? $cid : $maxcid;
-
-               my $res = {
-                   role => $t eq 'master' ? 'M' : 'N',
-                   cid => $cid
-               };
-
-               while (defined($line = <$fh>)) {
-                   chomp $line;
-                   next if $line =~ m/^\s*$/; # skip empty lines
-                   if ($line =~ m/^\}\s*$/) {
-                       $level--;
-                       last;
-                   }
-
-                   if ($line =~ m/^\s*(\S+)\s*:\s*(\S+)\s*$/) {
-                       my ($n, $v) = (lc $1, $2);
-
-                       # fixme: do syntax checks
-                       if ($n eq 'ip') {
-                           $res->{$n} = $v;
-                       } elsif ($n eq 'name') {
-                           $res->{$n} = $v;
-                       } elsif ($n eq 'hostrsapubkey') {
-                           $res->{$n} = $v;
-                       } elsif ($n eq 'rootrsapubkey') {
-                           $res->{$n} = $v;
-                       } else {
-                           die "syntax error in configuration file\n";
-                       }
-                   } else {
-                       die "syntax error in configuration file\n";
-                   }
-               }
-
-               die "missing ip address for node '$cid'\n" if !$res->{ip};
-               die "missing name for node '$cid'\n" if !$res->{name};
-               #die "missing host RSA key for node '$cid'\n" if !$res->{hostrsapubkey};
-               #die "missing user RSA key for node '$cid'\n" if !$res->{rootrsapubkey};
-
-               push @{$cinfo->{nodes}}, $res;
-
-               if ($res->{role} eq 'M') {
-                   $cinfo->{master} = $res;
-               }
-
-               if ($res->{ip} eq $localname) {
-                   $cinfo->{local} = $res;
-               }
-           } else {
-               die "syntax error in configuration file\n";
-           }
+    my $newdata = "";
+    my $vhash = {};
+    my @lines = split(/\n/, $data);
+    foreach my $line (@lines) {
+       if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
+            next if $vhash->{$3}++;
        }
+       $newdata .= "$line\n";
+    }
+
+    $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
+       if -f $rootsshauthkeys;
+
+    PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
+       if $old ne $newdata;
+}
+
+my $cfgdir = '/etc/pmg';
+my $syncdir = "$cfgdir/master";
+
+my $cond_commit_synced_file = sub {
+    my ($filename, $dstfn) = @_;
+
+    $dstfn = "$cfgdir/$filename" if !defined($dstfn);
+    my $srcfn = "$syncdir/$filename";
+
+    if (! -f $srcfn) {
+       unlink $dstfn;
+       return;
     }
 
-    die "syntax error in configuration file\n" if $level;
+    my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
+
+    if (-f $dstfn) {
+       my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
+       return 0 if $new eq $old;
+    }
+
+    # set mtime (touch) to avoid time drift problems
+    utime(undef, undef, $srcfn);
+
+    rename($srcfn, $dstfn) ||
+       die "cond_rename_file '$filename' failed - $!\n";
+
+    print STDERR "updated $dstfn\n";
+
+    return 1;
+};
+
+my $rsync_command = sub {
+    my ($host_key_alias, @args) = @_;
+
+    my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
+    $ssh_cmd .=  " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
+
+    my $cmd = ['rsync', $ssh_cmd,  '-q', @args];
+
+    return $cmd;
+};
+
+sub sync_quarantine_files {
+    my ($host_ip, $host_name, $flistname, $rcid) = @_;
+
+    my $spooldir = $PMG::MailQueue::spooldir;
+
+    mkdir "$spooldir/cluster/";
+    my $syncdir = "$spooldir/cluster/$rcid";
+    mkdir $syncdir;
+
+    my $cmd = $rsync_command->(
+       $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir,
+       '--files-from', $flistname);
+
+    PVE::Tools::run_command($cmd);
+}
+
+sub sync_spooldir {
+    my ($host_ip, $host_name, $rcid) = @_;
+
+    my $spooldir = $PMG::MailQueue::spooldir;
 
-    $cinfo->{maxcid} = $maxcid;
+    mkdir "$spooldir/cluster/";
+    my $syncdir = "$spooldir/cluster/$rcid";
+    mkdir $syncdir;
 
-    my @cidlist = ();
-    foreach my $ni (@{$cinfo->{nodes}}) {
-       next if $cinfo->{local}->{cid} == $ni->{cid}; # skip local CID
-       push @cidlist, $ni->{cid};
+    my $cmd = $rsync_command->(
+       $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir);
+
+    foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
+       push @$cmd, '--include', $incl;
     }
 
-    my $ind = 0;
-    my $portid = {};
-    foreach my $cid (sort @cidlist) {
-       $portid->{$cid} = $ind;
-       $ind++;
+    push @$cmd, '--exclude', '*';
+
+    PVE::Tools::run_command($cmd);
+}
+
+sub sync_master_quar {
+    my ($host_ip, $host_name) = @_;
+
+    my $spooldir = $PMG::MailQueue::spooldir;
+
+    my $syncdir = "$spooldir/cluster/";
+    mkdir $syncdir;
+
+    my $cmd = $rsync_command->(
+       $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir);
+
+    PVE::Tools::run_command($cmd);
+}
+
+sub sync_config_from_master {
+    my ($master_name, $master_ip, $noreload) = @_;
+
+    mkdir $syncdir;
+    File::Path::remove_tree($syncdir, {keep_root => 1});
+
+    my $sa_conf_dir = "/etc/mail/spamassassin";
+    my $sa_custom_cf = "custom.cf";
+
+    my $cmd = $rsync_command->(
+       $master_name, '-aq',
+       "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
+       "$syncdir/",
+       '--exclude', 'master/',
+       '--exclude', '*~',
+       '--exclude', '*.db',
+       '--exclude', 'pmg-api.pem',
+       '--exclude', 'pmg-tls.pem',
+       );
+
+    my $errmsg = "syncing master configuration from '${master_ip}' failed";
+    PVE::Tools::run_command($cmd, errmsg => $errmsg);
+
+    # verify that the remote host is cluster master
+    open (my $fh, '<', "$syncdir/cluster.conf") ||
+       die "unable to open synced cluster.conf - $!\n";
+
+    my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
+
+    if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
+       die "host '$master_ip' is not cluster master\n";
     }
 
-    foreach my $ni (@{$cinfo->{nodes}}) {
-       # fixme: do we still need those ports?
-       $ni->{configport} = $ni->{cid} == $cinfo->{local}->{cid} ? 83 : 50000 + $portid->{$ni->{cid}};
-       $ni->{dbport} = $ni->{cid} == $cinfo->{local}->{cid} ? 5432 : 50100 + $portid->{$ni->{cid}};
+    my $role = $cinfo->{'local'}->{type} // '-';
+    die "local node '$cinfo->{local}->{name}' not part of cluster\n"
+       if $role eq '-';
+
+    die "local node '$cinfo->{local}->{name}' is new cluster master\n"
+       if $role eq 'master';
+
+    $cond_commit_synced_file->('cluster.conf');
+
+    update_ssh_keys($cinfo); # rewrite ssh keys
+
+    PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
+
+    my $files = [
+       'pmg-authkey.key',
+       'pmg-authkey.pub',
+       'pmg-csrf.key',
+       'ldap.conf',
+       'user.conf',
+       'domains',
+       'mynetworks',
+       'transport',
+       'tls_policy',
+       'fetchmailrc',
+       ];
+
+    foreach my $filename (@$files) {
+       $cond_commit_synced_file->($filename);
+    }
+
+
+    my $force_restart = {};
+
+    if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
+       $force_restart->{spam} = 1;
     }
 
-    foreach my $ni (@{$cinfo->{nodes}}) {
-       next if $ni->{cid} == $cinfo->{local}->{cid};
-       push @{$cinfo->{remnodes}}, $ni->{cid};
+    $cond_commit_synced_file->('pmg.conf');
+
+    # sync user templates files/symlinks (not recursive)
+    my $srcdir = "$syncdir/templates";
+    if (-d $srcdir) {
+       my $dstdir = "$cfgdir/templates";
+       mkdir $dstdir;
+       my $names_hash = {};
+       foreach my $fn (<$srcdir/*>) {
+           next if $fn !~ m|^($srcdir/(.*))$|;
+           $fn = $1; # untaint;
+           my $name = $2;
+           $names_hash->{$name} = 1;
+           my $target = "$dstdir/$name";
+           if (-f $fn) {
+               $cond_commit_synced_file->("templates/$name", $target);
+           } elsif (-l $fn) {
+               warn "update $target failed - $!\n" if !rename($fn, $target);
+           }
+       }
+       # remove vanished files
+       foreach my $fn (<$dstdir/*>) {
+           next if $fn !~ m|^($dstdir/(.*))$|;
+           $fn = $1; # untaint;
+           my $name = $2;
+           next if $names_hash->{$name};
+           warn "unlink $fn failed - $!\n" if !unlink($fn);
+       }
+    }
+}
+
+sub sync_ruledb_from_master {
+    my ($ldb, $rdb, $ni, $ticket) = @_;
+
+    my $ruledb = PMG::RuleDB->new($ldb);
+    my $rulecache = PMG::RuleCache->new($ruledb);
+
+    my $conn = PVE::APIClient::LWP->new(
+       ticket => $ticket,
+       cookie_name => 'PMGAuthCookie',
+       host => $ni->{ip},
+       cached_fingerprints => {
+           $ni->{fingerprint} => 1,
+       });
+
+    my $digest = $conn->get("/config/ruledb/digest", {});
+
+    return if $digest eq $rulecache->{digest}; # no changes
+
+    syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
+
+    eval {
+       $ldb->begin_work;
+
+       $ldb->do("DELETE FROM Rule");
+       $ldb->do("DELETE FROM RuleGroup");
+       $ldb->do("DELETE FROM ObjectGroup");
+       $ldb->do("DELETE FROM Object");
+       $ldb->do("DELETE FROM Attribut");
+
+       eval {
+           $rdb->begin_work;
+
+           # read a consistent snapshot
+           $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+
+           PMG::DBTools::copy_table($ldb, $rdb, "Rule");
+           PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
+           PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
+           PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
+           PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
+       };
+
+       $rdb->rollback; # end transaction
+
+       die $@ if $@;
+
+       # update sequences
+
+       $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
+       $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
+       $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
+
+       $ldb->commit;
+    };
+    if (my $err = $@) {
+       $ldb->rollback;
+       die $err;
+    }
+
+    syslog('info', "finished rule database sync from host '$ni->{ip}'");
+}
+
+sub sync_quarantine_db {
+    my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
+
+    my $rcid = $ni->{cid};
+
+    my $maxmails = 100000;
+
+    my $mscount = 0;
+
+    my $ctime = PMG::DBTools::get_remote_time($rdb);
+
+    my $maxcount = 1000;
+
+    my $count;
+
+    PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
+
+    do { # get new values
+
+       $count = 0;
+
+       my $flistname = "/tmp/quarantinefilelist.$$";
+
+       eval {
+           $ldb->begin_work;
+
+           open(my $flistfh, '>', $flistname) ||
+               die "unable to open file '$flistname' - $!\n";
+
+           my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
+
+           # sync CMailStore
+
+           my $sth = $rdb->prepare(
+               "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
+               "ORDER BY cid,rid LIMIT ?");
+           $sth->execute($rcid, $lastid, $maxcount);
+
+           my $maxid;
+           my $callback = sub {
+               my $ref = shift;
+               $maxid = $ref->{rid};
+               my $filename = $ref->{file};
+                # skip files generated before cluster was created
+               return if $filename !~ m!^cluster/!;
+               print $flistfh "$filename\n";
+           };
+
+           my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
+           $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
+
+           close($flistfh);
+
+           my $starttime = [ gettimeofday() ];
+           sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
+           $$rsynctime_ref += tv_interval($starttime);
+
+           if ($maxid) {
+               # sync CMSReceivers
+
+               $sth = $rdb->prepare(
+                   "SELECT * from CMSReceivers WHERE " .
+                   "CMailStore_CID = ? AND CMailStore_RID > ?  " .
+                   "AND CMailStore_RID <= ?");
+               $sth->execute($rcid, $lastid, $maxid);
+
+               $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
+               PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
+
+               PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
+           }
+
+           $ldb->commit;
+       };
+       my $err = $@;
+
+       unlink $flistname;
+
+       if ($err) {
+           $ldb->rollback;
+           die $err;
+       }
+
+       $mscount += $count;
+
+    } while (($count >= $maxcount) && ($mscount < $maxmails));
+
+    PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
+
+    eval { # synchronize status updates
+       $ldb->begin_work;
+
+       my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
+
+       my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
+       $sth->execute($lastmt);
+
+       my $update_sth = $ldb->prepare(
+           "UPDATE CMSReceivers SET status = ? WHERE " .
+           "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
+       while (my $ref = $sth->fetchrow_hashref()) {
+           $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
+                                $ref->{cmailstore_rid}, $ref->{ticketid});
+       }
+
+       PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
+
+       $ldb->commit;
+    };
+    if (my $err = $@) {
+       $ldb->rollback;
+       die $err;
     }
 
-    return $cinfo;
+    return $mscount;
 }
 
-sub write_cluster_conf {
-    my ($filename, $fh, $cinfo) = @_;
-
-    my $raw = "maxcid $cinfo->{maxcid}\n\n";
-
-    foreach my $ni (@{$cinfo->{nodes}}) {
-
-       if ($ni->{role} eq 'M') {
-           $raw .= "master $ni->{cid} {\n";
-           $raw .= " IP: $ni->{ip}\n";
-           $raw .= " NAME: $ni->{name}\n";
-           $raw .= " HOSTRSAPUBKEY: $ni->{hostrsapubkey}\n";
-           $raw .= " ROOTRSAPUBKEY: $ni->{rootrsapubkey}\n";
-           $raw .= "}\n\n";
-       } elsif ($ni->{role} eq 'N') {
-           $raw .= "node $ni->{cid} {\n";
-           $raw .= " IP: $ni->{ip}\n";
-           $raw .= " NAME: $ni->{name}\n";
-           $raw .= " HOSTRSAPUBKEY: $ni->{hostrsapubkey}\n";
-           $raw .= " ROOTRSAPUBKEY: $ni->{rootrsapubkey}\n";
-           $raw .= "}\n\n";
+sub sync_statistic_db {
+    my ($ldb, $rdb, $ni) = @_;
+
+    my $rcid = $ni->{cid};
+
+    my $maxmails = 100000;
+
+    my $mscount = 0;
+
+    my $maxcount = 1000;
+
+    my $count;
+
+    PMG::DBTools::create_clusterinfo_default(
+       $ldb, $rcid, 'lastid_CStatistic', -1, undef);
+
+    do { # get new values
+
+       $count = 0;
+
+       eval {
+           $ldb->begin_work;
+
+           my $lastid = PMG::DBTools::read_int_clusterinfo(
+               $ldb, $rcid, 'lastid_CStatistic');
+
+           # sync CStatistic
+
+           my $sth = $rdb->prepare(
+               "SELECT * from CStatistic " .
+               "WHERE cid = ? AND rid > ? " .
+               "ORDER BY cid, rid LIMIT ?");
+           $sth->execute($rcid, $lastid, $maxcount);
+
+           my $maxid;
+           my $callback = sub {
+               my $ref = shift;
+               $maxid = $ref->{rid};
+           };
+
+           my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
+           $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
+
+           if ($maxid) {
+               # sync CReceivers
+
+               $sth = $rdb->prepare(
+                   "SELECT * from CReceivers WHERE " .
+                   "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
+               $sth->execute($rcid, $lastid, $maxid);
+
+               $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
+               PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
+           }
+
+           PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
+
+           $ldb->commit;
+       };
+       if (my $err = $@) {
+           $ldb->rollback;
+           die $err;
+       }
+
+       $mscount += $count;
+
+    } while (($count >= $maxcount) && ($mscount < $maxmails));
+
+    return $mscount;
+}
+
+my $sync_generic_mtime_db = sub {
+    my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
+
+    my $ctime = PMG::DBTools::get_remote_time($rdb);
+
+    PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
+
+    my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
+
+    my $sql_cmd = $selectfunc->($ctime, $lastmt);
+
+    my $sth = $rdb->prepare($sql_cmd);
+
+    $sth->execute();
+
+    my $updates = 0;
+
+    eval {
+       # use transaction to speedup things
+       my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
+       my $count = 0;
+       while (my $ref = $sth->fetchrow_hashref()) {
+           $ldb->begin_work if !$count;
+           $mergefunc->($ref);
+           if (++$count >= $max) {
+               $count = 0;
+               $ldb->commit;
+           }
+           $updates++;
        }
+
+       $ldb->commit if $count;
+    };
+    if (my $err = $@) {
+       $ldb->rollback;
+       die $err;
     }
 
-    PVE::Tools::safe_print($filename, $fh, $raw);
+    PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
+
+    return $updates;
+};
+
+sub sync_localstat_db {
+    my ($dbh, $rdb, $ni) = @_;
+
+    my $rcid = $ni->{cid};
+
+    my $selectfunc = sub {
+       my ($ctime, $lastmt) = @_;
+       return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
+    };
+
+    my $merge_sth = $dbh->prepare(
+       'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
+       'VALUES (?, ?, ?, ?, ?) ' .
+       'ON CONFLICT (Time, CID) DO UPDATE SET ' .
+       'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
+
+    my $mergefunc = sub {
+       my ($ref) = @_;
+
+       $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
+    };
+
+    return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
+}
+
+sub sync_greylist_db {
+    my ($dbh, $rdb, $ni) = @_;
+
+    my $selectfunc = sub {
+       my ($ctime, $lastmt) = @_;
+       return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
+           "mtime >= $lastmt AND CID != 0";
+    };
+
+    my $merge_sth = $dbh->prepare($PMG::DBTools::cgreylist_merge_sql);
+    my $mergefunc = sub {
+       my ($ref) = @_;
+
+       $merge_sth->execute(
+           $ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver},
+           $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
+           $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
+    };
+
+    return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
+}
+
+sub sync_userprefs_db {
+    my ($dbh, $rdb, $ni) = @_;
+
+    my $selectfunc = sub {
+       my ($ctime, $lastmt) = @_;
+
+       return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
+    };
+
+    my $merge_sth = $dbh->prepare(
+       "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
+       'VALUES (?, ?, ?, ?) ' .
+       'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
+       # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
+       'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
+       'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
+
+    my $mergefunc = sub {
+       my ($ref) = @_;
+
+       $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
+    };
+
+    return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
+}
+
+sub sync_domainstat_db {
+    my ($dbh, $rdb, $ni) = @_;
+
+    my $selectfunc = sub {
+       my ($ctime, $lastmt) = @_;
+       return "SELECT * from DomainStat WHERE mtime >= $lastmt";
+    };
+
+    my $merge_sth = $dbh->prepare(
+       'INSERT INTO Domainstat ' .
+       '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
+       'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
+       'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
+       'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
+       'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
+       'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
+       'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
+       'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
+       'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
+       'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
+
+    my $mergefunc = sub {
+       my ($ref) = @_;
+
+       $merge_sth->execute(
+           $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
+           $ref->{bytesin}, $ref->{bytesout},
+           $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
+           $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
+    };
+
+    return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
+}
+
+sub sync_dailystat_db {
+    my ($dbh, $rdb, $ni) = @_;
+
+    my $selectfunc = sub {
+       my ($ctime, $lastmt) = @_;
+       return "SELECT * from DailyStat WHERE mtime >= $lastmt";
+    };
+
+    my $merge_sth = $dbh->prepare(
+       'INSERT INTO DailyStat ' .
+       '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
+       'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
+       'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
+       'ON CONFLICT (Time) DO UPDATE SET ' .
+       'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
+       'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
+       'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
+       'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
+       'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
+       'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
+       'RBLCount = excluded.RBLCount, ' .
+       'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
+
+    my $mergefunc = sub {
+       my ($ref) = @_;
+
+       $merge_sth->execute(
+           $ref->{time}, $ref->{countin}, $ref->{countout},
+           $ref->{bytesin}, $ref->{bytesout},
+           $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
+           $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
+           $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
+    };
+
+    return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
+}
+
+sub sync_virusinfo_db {
+    my ($dbh, $rdb, $ni) = @_;
+
+    my $selectfunc = sub {
+       my ($ctime, $lastmt) = @_;
+       return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
+    };
+
+    my $merge_sth = $dbh->prepare(
+       'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
+       'VALUES (?,?,?,?) ' .
+       'ON CONFLICT (Time,Name) DO UPDATE SET ' .
+       'Count = excluded.Count , MTime = excluded.MTime');
+
+    my $mergefunc = sub {
+       my ($ref) = @_;
+
+       $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
+    };
+
+    return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
+}
+
+sub sync_deleted_nodes_from_master {
+    my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
+
+    my $rsynctime = 0;
+
+    my $cid_hash = {}; # fast lookup
+    foreach my $ni (values %{$cinfo->{ids}}) {
+       $cid_hash->{$ni->{cid}} = $ni;
+    }
+
+    my $spooldir = $PMG::MailQueue::spooldir;
+
+    my $maxcid = $cinfo->{master}->{maxcid} // 0;
+
+    for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
+       next if $cid_hash->{$rcid};
+
+       my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
+
+       next if -f $done_marker; # already synced
+
+       syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
+
+       my $starttime = [ gettimeofday() ];
+       sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
+       $$rsynctime_ref += tv_interval($starttime);
+
+       my $fake_ni = {
+           ip => $masterni->{ip},
+           name => $masterni->{name},
+           cid => $rcid,
+       };
+
+       sync_quarantine_db($ldb, $masterdb, $fake_ni);
+
+       sync_statistic_db ($ldb, $masterdb, $fake_ni);
+
+       open(my $fh, ">>",  $done_marker);
+   }
 }
 
-PVE::INotify::register_file('cluster.conf', "/etc/proxmox/cluster.conf",
-                           \&read_cluster_conf,
-                           \&write_cluster_conf,
-                           undef,
-                           always_call_parser => 1);
 
 1;