]> git.proxmox.com Git - pmg-api.git/blame - PMG/Cluster.pm
bump default maxspamsize to 1M
[pmg-api.git] / PMG / Cluster.pm
CommitLineData
0854fb22
DM
1package PMG::Cluster;
2
3use strict;
4use warnings;
45e68618 5use Data::Dumper;
0854fb22 6use Socket;
cfdf6608 7use File::Path;
9430b6d4 8use Time::HiRes qw (gettimeofday tv_interval);
45e68618 9
a7c7cad7 10use PVE::SafeSyslog;
0854fb22
DM
11use PVE::Tools;
12use PVE::INotify;
0757859a 13use PVE::APIClient::LWP;
0854fb22 14
1fb2ab76 15use PMG::Utils;
a7c7cad7 16use PMG::Config;
9f67f5b3 17use PMG::ClusterConfig;
db303db4
DM
18use PMG::RuleDB;
19use PMG::RuleCache;
9430b6d4 20use PMG::MailQueue;
0757859a 21use PMG::Fetchmail;
0854fb22 22
8737f93a
DM
23sub remote_node_ip {
24 my ($nodename, $noerr) = @_;
25
d8782874 26 my $cinfo = PMG::ClusterConfig->new();
8737f93a 27
45e68618 28 foreach my $entry (values %{$cinfo->{ids}}) {
8737f93a
DM
29 if ($entry->{name} eq $nodename) {
30 my $ip = $entry->{ip};
31 return $ip if !wantarray;
32 my $family = PVE::Tools::get_host_address_family($ip);
33 return ($ip, $family);
34 }
35 }
36
37 # fallback: try to get IP by other means
9f67f5b3 38 return PMG::Utils::lookup_node_ip($nodename, $noerr);
8737f93a
DM
39}
40
d2e43f9e
DM
41sub get_master_node {
42 my ($cinfo) = @_;
43
d8782874 44 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
d2e43f9e
DM
45
46 return $cinfo->{master}->{name} if defined($cinfo->{master});
47
48 return 'localhost';
49}
50
cba17aeb
DM
51sub read_local_ssl_cert_fingerprint {
52 my $cert_path = "/etc/pmg/pmg-api.pem";
0854fb22 53
cba17aeb
DM
54 my $cert;
55 eval {
56 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
57 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
58 Net::SSLeay::BIO_free($bio);
59 };
60 if (my $err = $@) {
61 die "unable to read certificate '$cert_path' - $err\n";
62 }
0854fb22 63
cba17aeb
DM
64 if (!defined($cert)) {
65 die "unable to read certificate '$cert_path' - got empty value\n";
66 }
67
68 my $fp;
69 eval {
70 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
71 };
72 if (my $err = $@) {
73 die "unable to get fingerprint for '$cert_path' - $err\n";
74 }
0854fb22 75
cba17aeb
DM
76 if (!defined($fp) || $fp eq '') {
77 die "unable to get fingerprint for '$cert_path' - got empty value\n";
78 }
0854fb22 79
cba17aeb
DM
80 return $fp;
81}
0854fb22 82
cba17aeb
DM
83my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
84my $rootrsakey_fn = '/root/.ssh/id_rsa';
85my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
0854fb22 86
cba17aeb
DM
87sub read_local_cluster_info {
88
89 my $res = {};
90
91 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
92 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
93 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
94
95 die "unable to parse ${hostrsapubkey_fn}\n"
96 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
0854fb22
DM
97
98 my $nodename = PVE::INotify::nodename();
99
cba17aeb 100 $res->{name} = $nodename;
0854fb22 101
cba17aeb 102 $res->{ip} = PMG::Utils::lookup_node_ip($nodename);
0854fb22 103
cba17aeb 104 $res->{hostrsapubkey} = $hostrsapubkey;
0854fb22 105
cba17aeb
DM
106 if (! -f $rootrsapubkey_fn) {
107 unlink $rootrsakey_fn;
108 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
109 '-f', $rootrsakey_fn];
1fb2ab76 110 PMG::Utils::run_silent_cmd($cmd);
cba17aeb
DM
111 }
112
113 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
114 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
115 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
116
117 die "unable to parse ${rootrsapubkey_fn}\n"
118 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
119
120 $res->{rootrsapubkey} = $rootrsapubkey;
121
122 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
123
124 return $res;
125}
126
127# X509 Certificate cache helper
128
129my $cert_cache_nodes = {};
130my $cert_cache_timestamp = time();
131my $cert_cache_fingerprints = {};
0854fb22 132
cba17aeb 133sub update_cert_cache {
0854fb22 134
cba17aeb
DM
135 $cert_cache_timestamp = time();
136
137 $cert_cache_fingerprints = {};
138 $cert_cache_nodes = {};
139
d8782874 140 my $cinfo = PMG::ClusterConfig->new();
cba17aeb
DM
141
142 foreach my $entry (values %{$cinfo->{ids}}) {
143 my $node = $entry->{name};
144 my $fp = $entry->{fingerprint};
145 if ($node && $fp) {
146 $cert_cache_fingerprints->{$fp} = 1;
147 $cert_cache_nodes->{$node} = $fp;
0854fb22
DM
148 }
149 }
150}
151
152# load and cache cert fingerprint once
153sub initialize_cert_cache {
154 my ($node) = @_;
155
cba17aeb 156 update_cert_cache()
0854fb22
DM
157 if defined($node) && !defined($cert_cache_nodes->{$node});
158}
159
160sub check_cert_fingerprint {
161 my ($cert) = @_;
162
163 # clear cache every 30 minutes at least
cba17aeb 164 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
0854fb22
DM
165
166 # get fingerprint of server certificate
167 my $fp;
168 eval {
169 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
170 };
171 return 0 if $@ || !defined($fp) || $fp eq ''; # error
172
173 my $check = sub {
174 for my $expected (keys %$cert_cache_fingerprints) {
175 return 1 if $fp eq $expected;
176 }
177 return 0;
178 };
179
cba17aeb 180 return 1 if $check->();
0854fb22
DM
181
182 # clear cache and retry at most once every minute
183 if (time() - $cert_cache_timestamp >= 60) {
184 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
185 update_cert_cache();
cba17aeb 186 return $check->();
0854fb22
DM
187 }
188
189 return 0;
190}
191
58072364
DM
192my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
193my $rootsshauthkeys = "/root/.ssh/authorized_keys";
194my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
195
196sub update_ssh_keys {
197 my ($cinfo) = @_;
198
199 my $data = '';
200 foreach my $node (values %{$cinfo->{ids}}) {
201 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
202 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
203 }
204
205 PVE::Tools::file_set_contents($sshglobalknownhosts, $data);
206
207 $data = '';
208
209 # always add ourself
210 if (-f $ssh_rsa_id) {
211 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
212 chomp($pub);
213 $data .= "$pub\n";
214 }
215
216 foreach my $node (values %{$cinfo->{ids}}) {
217 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
218 }
219
220 if (-f $rootsshauthkeys) {
a7c7cad7
DM
221 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
222 chomp($mykey);
223 $data .= "$mykey\n";
58072364
DM
224 }
225
226 my $newdata = "";
227 my $vhash = {};
228 my @lines = split(/\n/, $data);
229 foreach my $line (@lines) {
230 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
231 next if $vhash->{$3}++;
232 }
233 $newdata .= "$line\n";
234 }
235
236 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600);
237}
238
a7c7cad7
DM
239my $cfgdir = '/etc/pmg';
240my $syncdir = "$cfgdir/master";
241
242my $cond_commit_synced_file = sub {
243 my ($filename, $dstfn) = @_;
244
245 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
246 my $srcfn = "$syncdir/$filename";
247
248 if (! -f $srcfn) {
249 unlink $dstfn;
250 return;
251 }
252
253 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
254
255 if (-f $dstfn) {
256 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
257 return 0 if $new eq $old;
258 }
259
260 rename($srcfn, $dstfn) ||
261 die "cond_rename_file '$filename' failed - $!\n";
262
263 print STDERR "updated $dstfn\n";
264
265 return 1;
266};
267
c9dae0df
DM
268my $rsync_command = sub {
269 my ($host_key_alias, @args) = @_;
270
271 my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
272 $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
273
274 my $cmd = ['rsync', $ssh_cmd, '-q', @args];
275
276 return $cmd;
277};
278
279sub sync_quarantine_files {
280 my ($host_ip, $host_name, $flistname) = @_;
281
9430b6d4
DM
282 my $spooldir = $PMG::MailQueue::spooldir;
283
c9dae0df
DM
284 my $cmd = $rsync_command->(
285 $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir,
286 '--files-from', $flistname);
287
9430b6d4 288 PVE::Tools::run_command($cmd);
c9dae0df
DM
289}
290
291sub sync_spooldir {
292 my ($host_ip, $host_name, $rcid) = @_;
293
9430b6d4
DM
294 my $spooldir = $PMG::MailQueue::spooldir;
295
c9dae0df
DM
296 mkdir "$spooldir/cluster/";
297 my $syncdir = "$spooldir/cluster/$rcid";
298 mkdir $syncdir;
299
300 my $cmd = $rsync_command->(
301 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir);
302
303 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
304 push @$cmd, '--include', $incl;
305 }
306
307 push @$cmd, '--exclude', '*';
308
309 PVE::Tools::run_command($cmd);
310}
311
312sub sync_master_quar {
313 my ($host_ip, $host_name) = @_;
314
9430b6d4
DM
315 my $spooldir = $PMG::MailQueue::spooldir;
316
c9dae0df
DM
317 my $syncdir = "$spooldir/cluster/";
318 mkdir $syncdir;
319
320 my $cmd = $rsync_command->(
321 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir);
322
323 PVE::Tools::run_command($cmd);
324}
325
58072364 326sub sync_config_from_master {
809ae8f4 327 my ($master_name, $master_ip, $noreload) = @_;
58072364 328
58072364 329 mkdir $syncdir;
a7c7cad7 330 File::Path::remove_tree($syncdir, {keep_root => 1});
58072364 331
a7c7cad7
DM
332 my $sa_conf_dir = "/etc/mail/spamassassin";
333 my $sa_custom_cf = "custom.cf";
58072364 334
c9dae0df 335 my $cmd = $rsync_command->(
f4b7112b 336 $master_name, '-aq',
c9dae0df
DM
337 "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
338 "$syncdir/",
f4b7112b 339 '--exclude', 'master/',
c9dae0df
DM
340 '--exclude', '*~',
341 '--exclude', '*.db',
342 '--exclude', 'pmg-api.pem',
343 '--exclude', 'pmg-tls.pem',
344 );
58072364
DM
345
346 my $errmsg = "syncing master configuration from '${master_ip}' failed";
347 PVE::Tools::run_command($cmd, errmsg => $errmsg);
a7c7cad7
DM
348
349 # verify that the remote host is cluster master
350 open (my $fh, '<', "$syncdir/cluster.conf") ||
351 die "unable to open synced cluster.conf - $!\n";
a7c7cad7 352
809ae8f4
DM
353 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
354
355 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
a7c7cad7
DM
356 die "host '$master_ip' is not cluster master\n";
357 }
358
809ae8f4
DM
359 my $role = $cinfo->{'local'}->{type} // '-';
360 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
a7c7cad7
DM
361 if $role eq '-';
362
809ae8f4 363 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
a7c7cad7
DM
364 if $role eq 'master';
365
a7c7cad7 366 $cond_commit_synced_file->('cluster.conf');
a7c7cad7 367
0757859a
DM
368 PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
369
a7c7cad7
DM
370 my $files = [
371 'pmg-authkey.key',
372 'pmg-authkey.pub',
373 'pmg-csrf.key',
374 'ldap.conf',
375 'user.conf',
7cac3e28
DM
376 'domains',
377 'mynetworks',
378 'transport',
959aaeba 379 'tls_policy',
fd6feef4 380 'fetchmailrc',
a7c7cad7
DM
381 ];
382
383 foreach my $filename (@$files) {
384 $cond_commit_synced_file->($filename);
385 }
386
0757859a 387
a7c7cad7
DM
388 my $force_restart = {};
389
390 if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
391 $force_restart->{spam} = 1;
392 }
393
394 $cond_commit_synced_file->('pmg.conf');
f4b7112b
DM
395
396 # sync user templates files/symlinks (not recursive)
397 my $srcdir = "$syncdir/templates";
398 if (-d $srcdir) {
399 my $dstdir = "$cfgdir/templates";
400 mkdir $dstdir;
401 my $names_hash = {};
402 foreach my $fn (<$srcdir/*>) {
403 next if $fn !~ m|^($srcdir/(.*))$|;
404 $fn = $1; # untaint;
405 my $name = $2;
406 $names_hash->{$name} = 1;
407 my $target = "$dstdir/$name";
408 if (-f $fn) {
409 $cond_commit_synced_file->("templates/$name", $target);
410 } elsif (-l $fn) {
411 warn "update $target failed - $!\n" if !rename($fn, $target);
412 }
413 }
414 # remove vanished files
415 foreach my $fn (<$dstdir/*>) {
416 next if $fn !~ m|^($dstdir/(.*))$|;
417 $fn = $1; # untaint;
418 my $name = $2;
419 next if $names_hash->{$name};
420 warn "unlink $fn failed - $!\n" if !unlink($fn);
421 }
422 }
58072364
DM
423}
424
db303db4
DM
425sub sync_ruledb_from_master {
426 my ($ldb, $rdb, $ni, $ticket) = @_;
427
428 my $ruledb = PMG::RuleDB->new($ldb);
429 my $rulecache = PMG::RuleCache->new($ruledb);
430
431 my $conn = PVE::APIClient::LWP->new(
432 ticket => $ticket,
433 cookie_name => 'PMGAuthCookie',
434 host => $ni->{ip},
435 cached_fingerprints => {
436 $ni->{fingerprint} => 1,
437 });
438
439 my $digest = $conn->get("/config/ruledb/digest", {});
440
441 return if $digest eq $rulecache->{digest}; # no changes
442
443 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
444
445 eval {
446 $ldb->begin_work;
447
448 $ldb->do("DELETE FROM Rule");
449 $ldb->do("DELETE FROM RuleGroup");
450 $ldb->do("DELETE FROM ObjectGroup");
451 $ldb->do("DELETE FROM Object");
452 $ldb->do("DELETE FROM Attribut");
453
454 eval {
455 $rdb->begin_work;
456
457 # read a consistent snapshot
458 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
459
460 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
461 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
462 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
463 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
464 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
465 };
466
467 $rdb->rollback; # end transaction
468
469 die $@ if $@;
470
471 # update sequences
472
473 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
474 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
475 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
476
477 $ldb->commit;
478 };
479 if (my $err = $@) {
480 $ldb->rollback;
481 die $err;
482 }
483
484 syslog('info', "finished rule database sync from host '$ni->{ip}'");
485}
486
9430b6d4
DM
487sub sync_quarantine_db {
488 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
489
490 my $rcid = $ni->{cid};
491
492 my $maxmails = 100000;
493
494 my $mscount = 0;
495
496 my $ctime = PMG::DBTools::get_remote_time($rdb);
497
498 my $maxcount = 1000;
499
500 my $count;
501
502 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
503
504 do { # get new values
505
506 $count = 0;
507
508 my $flistname = "/tmp/quarantinefilelist.$$";
509
510 eval {
511 $ldb->begin_work;
512
513 open(my $flistfh, '>', $flistname) ||
514 die "unable to open file '$flistname' - $!\n";
515
516 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
517
518 # sync CMailStore
519
520 my $sth = $rdb->prepare(
521 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
522 "ORDER BY cid,rid LIMIT ?");
523 $sth->execute($rcid, $lastid, $maxcount);
524
525 my $maxid;
526 my $callback = sub {
527 my $ref = shift;
528 $maxid = $ref->{rid};
529 print $flistfh "$ref->{file}\n";
530 };
531
532 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
533 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
534
535 close($flistfh);
536
537 my $starttime = [ gettimeofday() ];
538 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname);
539 $$rsynctime_ref += tv_interval($starttime);
540
541 if ($maxid) {
542 # sync CMSReceivers
543
544 $sth = $rdb->prepare(
545 "SELECT * from CMSReceivers WHERE " .
546 "CMailStore_CID = ? AND CMailStore_RID > ? " .
547 "AND CMailStore_RID <= ?");
548 $sth->execute($rcid, $lastid, $maxid);
549
39d91358 550 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver status mtime)];
9430b6d4
DM
551 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
552
553 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
554 }
555
556 $ldb->commit;
557 };
558 my $err = $@;
559
560 unlink $flistname;
561
562 if ($err) {
563 $ldb->rollback;
564 die $err;
565 }
566
567 $mscount += $count;
568
569 last if $mscount >= $maxmails;
570
571 } while ($count >= $maxcount);
572
573 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
574
575 eval { # synchronize status updates
576 $ldb->begin_work;
577
578 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
579
580 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
581 $sth->execute($lastmt);
582
583 my $update_sth = $ldb->prepare(
584 "UPDATE CMSReceivers SET status = ? WHERE " .
585 "CMailstore_CID = ? AND CMailstore_RID = ? AND PMail = ?;");
586 while (my $ref = $sth->fetchrow_hashref()) {
587 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
588 $ref->{cmailstore_rid}, $ref->{pmail});
589 }
590
591 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
592
593 $ldb->commit;
594 };
595 if (my $err = $@) {
596 $ldb->rollback;
597 die $err;
598 }
599
600 return $mscount;
601}
602
603sub sync_statistic_db {
604 my ($ldb, $rdb, $ni) = @_;
605
606 my $rcid = $ni->{cid};
607
608 my $maxmails = 100000;
609
610 my $mscount = 0;
611
612 my $maxcount = 1000;
613
614 my $count;
615
616 PMG::DBTools::create_clusterinfo_default(
617 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
618
619 do { # get new values
620
621 $count = 0;
622
623 eval {
624 $ldb->begin_work;
625
626 my $lastid = PMG::DBTools::read_int_clusterinfo(
627 $ldb, $rcid, 'lastid_CStatistic');
628
629 # sync CStatistic
630
631 my $sth = $rdb->prepare(
632 "SELECT * from CStatistic " .
633 "WHERE cid = ? AND rid > ? " .
634 "ORDER BY cid, rid LIMIT ?");
635 $sth->execute($rcid, $lastid, $maxcount);
636
637 my $maxid;
638 my $callback = sub {
639 my $ref = shift;
640 $maxid = $ref->{rid};
641 };
642
643 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
644 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
645
646 if ($maxid) {
647 # sync CReceivers
648
649 $sth = $rdb->prepare(
650 "SELECT * from CReceivers WHERE " .
651 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
652 $sth->execute($rcid, $lastid, $maxid);
653
654 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
655 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
656
657 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
658 }
659
660 $ldb->commit;
661 };
662 if (my $err = $@) {
663 $ldb->rollback;
664 die $err;
665 }
666
667 $mscount += $count;
668
669 last if $mscount >= $maxmails;
670
671 } while ($count >= $maxcount);
e86b85a3
DM
672
673 return $mscount;
9430b6d4
DM
674}
675
986eec31 676my $sync_generic_mtime_db = sub {
9430b6d4
DM
677 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
678
9430b6d4
DM
679 my $ctime = PMG::DBTools::get_remote_time($rdb);
680
681 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
682
683 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
684
4c93256a 685 my $sql_cmd = $selectfunc->($ctime, $lastmt);
9430b6d4 686
4c93256a 687 my $sth = $rdb->prepare($sql_cmd);
9430b6d4
DM
688
689 $sth->execute();
690
986eec31
DM
691 my $updates = 0;
692
4c93256a
DM
693 eval {
694 # use transaction to speedup things
695 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
4c93256a
DM
696 my $count = 0;
697 while (my $ref = $sth->fetchrow_hashref()) {
9d5bdd2a
DM
698 $ldb->begin_work if !$count;
699 $mergefunc->($ref);
4c93256a
DM
700 if (++$count >= $max) {
701 $count = 0;
702 $ldb->commit;
9430b6d4 703 }
986eec31 704 $updates++;
9430b6d4 705 }
9d5bdd2a
DM
706
707 $ldb->commit if $count;
9430b6d4 708 };
4c93256a
DM
709 if (my $err = $@) {
710 $ldb->rollback;
711 die $err;
9430b6d4
DM
712 }
713
9430b6d4
DM
714 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
715
986eec31
DM
716 return $updates;
717};
9430b6d4 718
5e1408fd
DM
719sub sync_localstat_db {
720 my ($dbh, $rdb, $ni) = @_;
721
722 my $rcid = $ni->{cid};
723
724 my $selectfunc = sub {
725 my ($ctime, $lastmt) = @_;
726 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
727 };
728
729 my $merge_sth = $dbh->prepare(
ebd19c79
DM
730 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
731 'VALUES (?, ?, ?, ?, ?) ' .
5e1408fd 732 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
ebd19c79 733 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
5e1408fd
DM
734
735 my $mergefunc = sub {
736 my ($ref) = @_;
737
ebd19c79 738 $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
5e1408fd
DM
739 };
740
741 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
742}
743
9430b6d4
DM
744sub sync_greylist_db {
745 my ($dbh, $rdb, $ni) = @_;
746
747 my $selectfunc = sub {
748 my ($ctime, $lastmt) = @_;
749 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
750 "mtime >= $lastmt AND CID != 0";
751 };
752
2e049252 753 my $merge_sth = $dbh->prepare($PMG::DBTools::cgreylist_merge_sql);
9430b6d4 754 my $mergefunc = sub {
986eec31 755 my ($ref) = @_;
9430b6d4 756
f413f920
DM
757 $merge_sth->execute(
758 $ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver},
759 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
760 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
9430b6d4
DM
761 };
762
986eec31 763 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
9430b6d4
DM
764}
765
766sub sync_userprefs_db {
767 my ($dbh, $rdb, $ni) = @_;
768
769 my $selectfunc = sub {
770 my ($ctime, $lastmt) = @_;
771
772 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
773 };
774
7d13157e 775 my $merge_sth = $dbh->prepare(
471b05ed 776 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
0527ea1a 777 'VALUES (?, ?, ?, 0) ' .
471b05ed 778 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
dbdf1298 779 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
3a9ffd85 780 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
0527ea1a 781 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
471b05ed 782
9430b6d4 783 my $mergefunc = sub {
986eec31 784 my ($ref) = @_;
9430b6d4 785
7d13157e 786 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data});
9430b6d4
DM
787 };
788
986eec31 789 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
9430b6d4
DM
790}
791
792sub sync_domainstat_db {
793 my ($dbh, $rdb, $ni) = @_;
794
795 my $selectfunc = sub {
796 my ($ctime, $lastmt) = @_;
797 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
798 };
799
1c7ea32c
DM
800 my $merge_sth = $dbh->prepare(
801 'INSERT INTO Domainstat ' .
802 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
803 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
804 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
805 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
806 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
807 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
808 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
809 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
810 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
811 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
812
9430b6d4 813 my $mergefunc = sub {
986eec31 814 my ($ref) = @_;
9430b6d4 815
1c7ea32c
DM
816 $merge_sth->execute(
817 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
818 $ref->{bytesin}, $ref->{bytesout},
819 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
820 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
821 };
822
986eec31 823 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
9430b6d4
DM
824}
825
826sub sync_dailystat_db {
827 my ($dbh, $rdb, $ni) = @_;
828
829 my $selectfunc = sub {
830 my ($ctime, $lastmt) = @_;
831 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
8bf584ae
DM
832 };
833
834 my $merge_sth = $dbh->prepare(
835 'INSERT INTO DailyStat ' .
836 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
837 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
838 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
839 'ON CONFLICT (Time) DO UPDATE SET ' .
840 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
841 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
842 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
843 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
844 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
845 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
846 'RBLCount = excluded.RBLCount, ' .
847 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
9430b6d4
DM
848
849 my $mergefunc = sub {
986eec31 850 my ($ref) = @_;
9430b6d4 851
dbdf1298 852 $merge_sth->execute(
8bf584ae
DM
853 $ref->{time}, $ref->{countin}, $ref->{countout},
854 $ref->{bytesin}, $ref->{bytesout},
855 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
856 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
857 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
858 };
859
986eec31 860 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
9430b6d4
DM
861}
862
863sub sync_virusinfo_db {
864 my ($dbh, $rdb, $ni) = @_;
865
866 my $selectfunc = sub {
867 my ($ctime, $lastmt) = @_;
868 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
869 };
870
80615ad6
DM
871 my $merge_sth = $dbh->prepare(
872 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
873 'VALUES (?,?,?,?) ' .
874 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
875 'Count = excluded.Count , MTime = excluded.MTime');
876
9430b6d4 877 my $mergefunc = sub {
986eec31 878 my ($ref) = @_;
9430b6d4 879
80615ad6 880 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
9430b6d4
DM
881 };
882
986eec31 883 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
9430b6d4
DM
884}
885
886sub sync_deleted_nodes_from_master {
887 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
888
889 my $rsynctime = 0;
890
891 my $cid_hash = {}; # fast lookup
5c6e6eeb 892 foreach my $ni (values %{$cinfo->{ids}}) {
9430b6d4
DM
893 $cid_hash->{$ni->{cid}} = $ni;
894 }
895
896 my $spooldir = $PMG::MailQueue::spooldir;
897
5c6e6eeb
DM
898 my $maxcid = $cinfo->{master}->{maxcid} // 0;
899
900 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
9430b6d4
DM
901 next if $cid_hash->{$rcid};
902
903 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
904
905 next if -f $done_marker; # already synced
906
907 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
908
909 my $starttime = [ gettimeofday() ];
910 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
911 $$rsynctime_ref += tv_interval($starttime);
912
913 my $fake_ni = {
914 ip => $masterni->{ip},
915 name => $masterni->{name},
916 cid => $rcid,
917 };
918
919 sync_quarantine_db($ldb, $masterdb, $fake_ni);
920
921 sync_statistic_db ($ldb, $masterdb, $fake_ni);
922
923 open(my $fh, ">>", $done_marker);
924 }
925}
926
927
0854fb22 9281;