]> git.proxmox.com Git - pmg-api.git/blob - PMG/Cluster.pm
5eb41b5046ae001aa2177086680f4f857c8a58e7
[pmg-api.git] / PMG / Cluster.pm
1 package PMG::Cluster;
2
3 use strict;
4 use warnings;
5 use Data::Dumper;
6 use Socket;
7 use File::Path;
8 use Time::HiRes qw (gettimeofday tv_interval);
9
10 use PVE::SafeSyslog;
11 use PVE::Tools;
12 use PVE::INotify;
13 use PVE::APIClient::LWP;
14
15 use PMG::Utils;
16 use PMG::Config;
17 use PMG::ClusterConfig;
18 use PMG::RuleDB;
19 use PMG::RuleCache;
20 use PMG::MailQueue;
21 use PMG::Fetchmail;
22
23 sub remote_node_ip {
24 my ($nodename, $noerr) = @_;
25
26 my $cinfo = PMG::ClusterConfig->new();
27
28 foreach my $entry (values %{$cinfo->{ids}}) {
29 if ($entry->{name} eq $nodename) {
30 my $ip = $entry->{ip};
31 return $ip if !wantarray;
32 my $family = PVE::Tools::get_host_address_family($ip);
33 return ($ip, $family);
34 }
35 }
36
37 # fallback: try to get IP by other means
38 return PMG::Utils::lookup_node_ip($nodename, $noerr);
39 }
40
41 sub get_master_node {
42 my ($cinfo) = @_;
43
44 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
45
46 return $cinfo->{master}->{name} if defined($cinfo->{master});
47
48 return 'localhost';
49 }
50
51 sub read_local_ssl_cert_fingerprint {
52 my $cert_path = "/etc/pmg/pmg-api.pem";
53
54 my $cert;
55 eval {
56 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
57 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
58 Net::SSLeay::BIO_free($bio);
59 };
60 if (my $err = $@) {
61 die "unable to read certificate '$cert_path' - $err\n";
62 }
63
64 if (!defined($cert)) {
65 die "unable to read certificate '$cert_path' - got empty value\n";
66 }
67
68 my $fp;
69 eval {
70 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
71 };
72 if (my $err = $@) {
73 die "unable to get fingerprint for '$cert_path' - $err\n";
74 }
75
76 if (!defined($fp) || $fp eq '') {
77 die "unable to get fingerprint for '$cert_path' - got empty value\n";
78 }
79
80 return $fp;
81 }
82
83 my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
84 my $rootrsakey_fn = '/root/.ssh/id_rsa';
85 my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
86
87 sub read_local_cluster_info {
88
89 my $res = {};
90
91 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
92 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
93 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
94
95 die "unable to parse ${hostrsapubkey_fn}\n"
96 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
97
98 my $nodename = PVE::INotify::nodename();
99
100 $res->{name} = $nodename;
101
102 $res->{ip} = PMG::Utils::lookup_node_ip($nodename);
103
104 $res->{hostrsapubkey} = $hostrsapubkey;
105
106 if (! -f $rootrsapubkey_fn) {
107 unlink $rootrsakey_fn;
108 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
109 '-f', $rootrsakey_fn];
110 PMG::Utils::run_silent_cmd($cmd);
111 }
112
113 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
114 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
115 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
116
117 die "unable to parse ${rootrsapubkey_fn}\n"
118 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
119
120 $res->{rootrsapubkey} = $rootrsapubkey;
121
122 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
123
124 return $res;
125 }
126
127 # X509 Certificate cache helper
128
129 my $cert_cache_nodes = {};
130 my $cert_cache_timestamp = time();
131 my $cert_cache_fingerprints = {};
132
133 sub update_cert_cache {
134
135 $cert_cache_timestamp = time();
136
137 $cert_cache_fingerprints = {};
138 $cert_cache_nodes = {};
139
140 my $cinfo = PMG::ClusterConfig->new();
141
142 foreach my $entry (values %{$cinfo->{ids}}) {
143 my $node = $entry->{name};
144 my $fp = $entry->{fingerprint};
145 if ($node && $fp) {
146 $cert_cache_fingerprints->{$fp} = 1;
147 $cert_cache_nodes->{$node} = $fp;
148 }
149 }
150 }
151
152 # load and cache cert fingerprint once
153 sub initialize_cert_cache {
154 my ($node) = @_;
155
156 update_cert_cache()
157 if defined($node) && !defined($cert_cache_nodes->{$node});
158 }
159
160 sub check_cert_fingerprint {
161 my ($cert) = @_;
162
163 # clear cache every 30 minutes at least
164 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
165
166 # get fingerprint of server certificate
167 my $fp;
168 eval {
169 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
170 };
171 return 0 if $@ || !defined($fp) || $fp eq ''; # error
172
173 my $check = sub {
174 for my $expected (keys %$cert_cache_fingerprints) {
175 return 1 if $fp eq $expected;
176 }
177 return 0;
178 };
179
180 return 1 if $check->();
181
182 # clear cache and retry at most once every minute
183 if (time() - $cert_cache_timestamp >= 60) {
184 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
185 update_cert_cache();
186 return $check->();
187 }
188
189 return 0;
190 }
191
192 my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
193 my $rootsshauthkeys = "/root/.ssh/authorized_keys";
194 my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
195
196 sub update_ssh_keys {
197 my ($cinfo) = @_;
198
199 my $old = '';
200 my $data = '';
201
202 foreach my $node (values %{$cinfo->{ids}}) {
203 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
204 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
205 }
206
207 $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
208 if -f $sshglobalknownhosts;
209
210 PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
211 if $old ne $data;
212
213 $data = '';
214 $old = '';
215
216 # always add ourself
217 if (-f $ssh_rsa_id) {
218 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
219 chomp($pub);
220 $data .= "$pub\n";
221 }
222
223 foreach my $node (values %{$cinfo->{ids}}) {
224 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
225 }
226
227 if (-f $rootsshauthkeys) {
228 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
229 chomp($mykey);
230 $data .= "$mykey\n";
231 }
232
233 my $newdata = "";
234 my $vhash = {};
235 my @lines = split(/\n/, $data);
236 foreach my $line (@lines) {
237 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
238 next if $vhash->{$3}++;
239 }
240 $newdata .= "$line\n";
241 }
242
243 $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
244 if -f $rootsshauthkeys;
245
246 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
247 if $old ne $newdata;
248 }
249
250 my $cfgdir = '/etc/pmg';
251 my $syncdir = "$cfgdir/master";
252
253 my $cond_commit_synced_file = sub {
254 my ($filename, $dstfn) = @_;
255
256 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
257 my $srcfn = "$syncdir/$filename";
258
259 if (! -f $srcfn) {
260 unlink $dstfn;
261 return;
262 }
263
264 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
265
266 if (-f $dstfn) {
267 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
268 return 0 if $new eq $old;
269 }
270
271 # set mtime (touch) to avoid time drift problems
272 utime(undef, undef, $srcfn);
273
274 rename($srcfn, $dstfn) ||
275 die "cond_rename_file '$filename' failed - $!\n";
276
277 print STDERR "updated $dstfn\n";
278
279 return 1;
280 };
281
282 my $rsync_command = sub {
283 my ($host_key_alias, @args) = @_;
284
285 my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
286 $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
287
288 my $cmd = ['rsync', $ssh_cmd, '-q', @args];
289
290 return $cmd;
291 };
292
293 sub sync_quarantine_files {
294 my ($host_ip, $host_name, $flistname, $rcid) = @_;
295
296 my $spooldir = $PMG::MailQueue::spooldir;
297
298 mkdir "$spooldir/cluster/";
299 my $syncdir = "$spooldir/cluster/$rcid";
300 mkdir $syncdir;
301
302 my $cmd = $rsync_command->(
303 $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir,
304 '--files-from', $flistname);
305
306 PVE::Tools::run_command($cmd);
307 }
308
309 sub sync_spooldir {
310 my ($host_ip, $host_name, $rcid) = @_;
311
312 my $spooldir = $PMG::MailQueue::spooldir;
313
314 mkdir "$spooldir/cluster/";
315 my $syncdir = "$spooldir/cluster/$rcid";
316 mkdir $syncdir;
317
318 my $cmd = $rsync_command->(
319 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir);
320
321 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
322 push @$cmd, '--include', $incl;
323 }
324
325 push @$cmd, '--exclude', '*';
326
327 PVE::Tools::run_command($cmd);
328 }
329
330 sub sync_master_quar {
331 my ($host_ip, $host_name) = @_;
332
333 my $spooldir = $PMG::MailQueue::spooldir;
334
335 my $syncdir = "$spooldir/cluster/";
336 mkdir $syncdir;
337
338 my $cmd = $rsync_command->(
339 $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir);
340
341 PVE::Tools::run_command($cmd);
342 }
343
344 sub sync_config_from_master {
345 my ($master_name, $master_ip, $noreload) = @_;
346
347 mkdir $syncdir;
348 File::Path::remove_tree($syncdir, {keep_root => 1});
349
350 my $sa_conf_dir = "/etc/mail/spamassassin";
351 my $sa_custom_cf = "custom.cf";
352
353 my $cmd = $rsync_command->(
354 $master_name, '-aq',
355 "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}",
356 "$syncdir/",
357 '--exclude', 'master/',
358 '--exclude', '*~',
359 '--exclude', '*.db',
360 '--exclude', 'pmg-api.pem',
361 '--exclude', 'pmg-tls.pem',
362 );
363
364 my $errmsg = "syncing master configuration from '${master_ip}' failed";
365 PVE::Tools::run_command($cmd, errmsg => $errmsg);
366
367 # verify that the remote host is cluster master
368 open (my $fh, '<', "$syncdir/cluster.conf") ||
369 die "unable to open synced cluster.conf - $!\n";
370
371 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
372
373 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
374 die "host '$master_ip' is not cluster master\n";
375 }
376
377 my $role = $cinfo->{'local'}->{type} // '-';
378 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
379 if $role eq '-';
380
381 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
382 if $role eq 'master';
383
384 $cond_commit_synced_file->('cluster.conf');
385
386 update_ssh_keys($cinfo); # rewrite ssh keys
387
388 PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
389
390 my $files = [
391 'pmg-authkey.key',
392 'pmg-authkey.pub',
393 'pmg-csrf.key',
394 'ldap.conf',
395 'user.conf',
396 'domains',
397 'mynetworks',
398 'transport',
399 'tls_policy',
400 'fetchmailrc',
401 ];
402
403 foreach my $filename (@$files) {
404 $cond_commit_synced_file->($filename);
405 }
406
407
408 my $force_restart = {};
409
410 if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) {
411 $force_restart->{spam} = 1;
412 }
413
414 $cond_commit_synced_file->('pmg.conf');
415
416 # sync user templates files/symlinks (not recursive)
417 my $srcdir = "$syncdir/templates";
418 if (-d $srcdir) {
419 my $dstdir = "$cfgdir/templates";
420 mkdir $dstdir;
421 my $names_hash = {};
422 foreach my $fn (<$srcdir/*>) {
423 next if $fn !~ m|^($srcdir/(.*))$|;
424 $fn = $1; # untaint;
425 my $name = $2;
426 $names_hash->{$name} = 1;
427 my $target = "$dstdir/$name";
428 if (-f $fn) {
429 $cond_commit_synced_file->("templates/$name", $target);
430 } elsif (-l $fn) {
431 warn "update $target failed - $!\n" if !rename($fn, $target);
432 }
433 }
434 # remove vanished files
435 foreach my $fn (<$dstdir/*>) {
436 next if $fn !~ m|^($dstdir/(.*))$|;
437 $fn = $1; # untaint;
438 my $name = $2;
439 next if $names_hash->{$name};
440 warn "unlink $fn failed - $!\n" if !unlink($fn);
441 }
442 }
443 }
444
445 sub sync_ruledb_from_master {
446 my ($ldb, $rdb, $ni, $ticket) = @_;
447
448 my $ruledb = PMG::RuleDB->new($ldb);
449 my $rulecache = PMG::RuleCache->new($ruledb);
450
451 my $conn = PVE::APIClient::LWP->new(
452 ticket => $ticket,
453 cookie_name => 'PMGAuthCookie',
454 host => $ni->{ip},
455 cached_fingerprints => {
456 $ni->{fingerprint} => 1,
457 });
458
459 my $digest = $conn->get("/config/ruledb/digest", {});
460
461 return if $digest eq $rulecache->{digest}; # no changes
462
463 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
464
465 eval {
466 $ldb->begin_work;
467
468 $ldb->do("DELETE FROM Rule");
469 $ldb->do("DELETE FROM RuleGroup");
470 $ldb->do("DELETE FROM ObjectGroup");
471 $ldb->do("DELETE FROM Object");
472 $ldb->do("DELETE FROM Attribut");
473
474 eval {
475 $rdb->begin_work;
476
477 # read a consistent snapshot
478 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
479
480 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
481 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
482 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
483 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
484 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
485 };
486
487 $rdb->rollback; # end transaction
488
489 die $@ if $@;
490
491 # update sequences
492
493 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
494 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
495 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
496
497 $ldb->commit;
498 };
499 if (my $err = $@) {
500 $ldb->rollback;
501 die $err;
502 }
503
504 syslog('info', "finished rule database sync from host '$ni->{ip}'");
505 }
506
507 sub sync_quarantine_db {
508 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
509
510 my $rcid = $ni->{cid};
511
512 my $maxmails = 100000;
513
514 my $mscount = 0;
515
516 my $ctime = PMG::DBTools::get_remote_time($rdb);
517
518 my $maxcount = 1000;
519
520 my $count;
521
522 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
523
524 do { # get new values
525
526 $count = 0;
527
528 my $flistname = "/tmp/quarantinefilelist.$$";
529
530 eval {
531 $ldb->begin_work;
532
533 open(my $flistfh, '>', $flistname) ||
534 die "unable to open file '$flistname' - $!\n";
535
536 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
537
538 # sync CMailStore
539
540 my $sth = $rdb->prepare(
541 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
542 "ORDER BY cid,rid LIMIT ?");
543 $sth->execute($rcid, $lastid, $maxcount);
544
545 my $maxid;
546 my $callback = sub {
547 my $ref = shift;
548 $maxid = $ref->{rid};
549 my $filename = $ref->{file};
550 # skip files generated before cluster was created
551 return if $filename !~ m!^cluster/!;
552 print $flistfh "$filename\n";
553 };
554
555 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
556 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
557
558 close($flistfh);
559
560 my $starttime = [ gettimeofday() ];
561 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
562 $$rsynctime_ref += tv_interval($starttime);
563
564 if ($maxid) {
565 # sync CMSReceivers
566
567 $sth = $rdb->prepare(
568 "SELECT * from CMSReceivers WHERE " .
569 "CMailStore_CID = ? AND CMailStore_RID > ? " .
570 "AND CMailStore_RID <= ?");
571 $sth->execute($rcid, $lastid, $maxid);
572
573 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
574 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
575
576 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
577 }
578
579 $ldb->commit;
580 };
581 my $err = $@;
582
583 unlink $flistname;
584
585 if ($err) {
586 $ldb->rollback;
587 die $err;
588 }
589
590 $mscount += $count;
591
592 last if $mscount >= $maxmails;
593
594 } while ($count >= $maxcount);
595
596 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
597
598 eval { # synchronize status updates
599 $ldb->begin_work;
600
601 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
602
603 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
604 $sth->execute($lastmt);
605
606 my $update_sth = $ldb->prepare(
607 "UPDATE CMSReceivers SET status = ? WHERE " .
608 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
609 while (my $ref = $sth->fetchrow_hashref()) {
610 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
611 $ref->{cmailstore_rid}, $ref->{ticketid});
612 }
613
614 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
615
616 $ldb->commit;
617 };
618 if (my $err = $@) {
619 $ldb->rollback;
620 die $err;
621 }
622
623 return $mscount;
624 }
625
626 sub sync_statistic_db {
627 my ($ldb, $rdb, $ni) = @_;
628
629 my $rcid = $ni->{cid};
630
631 my $maxmails = 100000;
632
633 my $mscount = 0;
634
635 my $maxcount = 1000;
636
637 my $count;
638
639 PMG::DBTools::create_clusterinfo_default(
640 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
641
642 do { # get new values
643
644 $count = 0;
645
646 eval {
647 $ldb->begin_work;
648
649 my $lastid = PMG::DBTools::read_int_clusterinfo(
650 $ldb, $rcid, 'lastid_CStatistic');
651
652 # sync CStatistic
653
654 my $sth = $rdb->prepare(
655 "SELECT * from CStatistic " .
656 "WHERE cid = ? AND rid > ? " .
657 "ORDER BY cid, rid LIMIT ?");
658 $sth->execute($rcid, $lastid, $maxcount);
659
660 my $maxid;
661 my $callback = sub {
662 my $ref = shift;
663 $maxid = $ref->{rid};
664 };
665
666 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
667 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
668
669 if ($maxid) {
670 # sync CReceivers
671
672 $sth = $rdb->prepare(
673 "SELECT * from CReceivers WHERE " .
674 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
675 $sth->execute($rcid, $lastid, $maxid);
676
677 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
678 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
679 }
680
681 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
682
683 $ldb->commit;
684 };
685 if (my $err = $@) {
686 $ldb->rollback;
687 die $err;
688 }
689
690 $mscount += $count;
691
692 last if $mscount >= $maxmails;
693
694 } while ($count >= $maxcount);
695
696 return $mscount;
697 }
698
699 my $sync_generic_mtime_db = sub {
700 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
701
702 my $ctime = PMG::DBTools::get_remote_time($rdb);
703
704 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
705
706 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
707
708 my $sql_cmd = $selectfunc->($ctime, $lastmt);
709
710 my $sth = $rdb->prepare($sql_cmd);
711
712 $sth->execute();
713
714 my $updates = 0;
715
716 eval {
717 # use transaction to speedup things
718 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
719 my $count = 0;
720 while (my $ref = $sth->fetchrow_hashref()) {
721 $ldb->begin_work if !$count;
722 $mergefunc->($ref);
723 if (++$count >= $max) {
724 $count = 0;
725 $ldb->commit;
726 }
727 $updates++;
728 }
729
730 $ldb->commit if $count;
731 };
732 if (my $err = $@) {
733 $ldb->rollback;
734 die $err;
735 }
736
737 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
738
739 return $updates;
740 };
741
742 sub sync_localstat_db {
743 my ($dbh, $rdb, $ni) = @_;
744
745 my $rcid = $ni->{cid};
746
747 my $selectfunc = sub {
748 my ($ctime, $lastmt) = @_;
749 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
750 };
751
752 my $merge_sth = $dbh->prepare(
753 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
754 'VALUES (?, ?, ?, ?, ?) ' .
755 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
756 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
757
758 my $mergefunc = sub {
759 my ($ref) = @_;
760
761 $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
762 };
763
764 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
765 }
766
767 sub sync_greylist_db {
768 my ($dbh, $rdb, $ni) = @_;
769
770 my $selectfunc = sub {
771 my ($ctime, $lastmt) = @_;
772 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
773 "mtime >= $lastmt AND CID != 0";
774 };
775
776 my $merge_sth = $dbh->prepare($PMG::DBTools::cgreylist_merge_sql);
777 my $mergefunc = sub {
778 my ($ref) = @_;
779
780 $merge_sth->execute(
781 $ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver},
782 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
783 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
784 };
785
786 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
787 }
788
789 sub sync_userprefs_db {
790 my ($dbh, $rdb, $ni) = @_;
791
792 my $selectfunc = sub {
793 my ($ctime, $lastmt) = @_;
794
795 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
796 };
797
798 my $merge_sth = $dbh->prepare(
799 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
800 'VALUES (?, ?, ?, ?) ' .
801 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
802 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
803 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
804 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
805
806 my $mergefunc = sub {
807 my ($ref) = @_;
808
809 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
810 };
811
812 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
813 }
814
815 sub sync_domainstat_db {
816 my ($dbh, $rdb, $ni) = @_;
817
818 my $selectfunc = sub {
819 my ($ctime, $lastmt) = @_;
820 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
821 };
822
823 my $merge_sth = $dbh->prepare(
824 'INSERT INTO Domainstat ' .
825 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
826 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
827 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
828 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
829 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
830 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
831 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
832 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
833 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
834 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
835
836 my $mergefunc = sub {
837 my ($ref) = @_;
838
839 $merge_sth->execute(
840 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
841 $ref->{bytesin}, $ref->{bytesout},
842 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
843 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
844 };
845
846 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
847 }
848
849 sub sync_dailystat_db {
850 my ($dbh, $rdb, $ni) = @_;
851
852 my $selectfunc = sub {
853 my ($ctime, $lastmt) = @_;
854 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
855 };
856
857 my $merge_sth = $dbh->prepare(
858 'INSERT INTO DailyStat ' .
859 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
860 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
861 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
862 'ON CONFLICT (Time) DO UPDATE SET ' .
863 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
864 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
865 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
866 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
867 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
868 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
869 'RBLCount = excluded.RBLCount, ' .
870 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
871
872 my $mergefunc = sub {
873 my ($ref) = @_;
874
875 $merge_sth->execute(
876 $ref->{time}, $ref->{countin}, $ref->{countout},
877 $ref->{bytesin}, $ref->{bytesout},
878 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
879 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
880 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
881 };
882
883 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
884 }
885
886 sub sync_virusinfo_db {
887 my ($dbh, $rdb, $ni) = @_;
888
889 my $selectfunc = sub {
890 my ($ctime, $lastmt) = @_;
891 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
892 };
893
894 my $merge_sth = $dbh->prepare(
895 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
896 'VALUES (?,?,?,?) ' .
897 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
898 'Count = excluded.Count , MTime = excluded.MTime');
899
900 my $mergefunc = sub {
901 my ($ref) = @_;
902
903 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
904 };
905
906 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
907 }
908
909 sub sync_deleted_nodes_from_master {
910 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
911
912 my $rsynctime = 0;
913
914 my $cid_hash = {}; # fast lookup
915 foreach my $ni (values %{$cinfo->{ids}}) {
916 $cid_hash->{$ni->{cid}} = $ni;
917 }
918
919 my $spooldir = $PMG::MailQueue::spooldir;
920
921 my $maxcid = $cinfo->{master}->{maxcid} // 0;
922
923 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
924 next if $cid_hash->{$rcid};
925
926 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
927
928 next if -f $done_marker; # already synced
929
930 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
931
932 my $starttime = [ gettimeofday() ];
933 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
934 $$rsynctime_ref += tv_interval($starttime);
935
936 my $fake_ni = {
937 ip => $masterni->{ip},
938 name => $masterni->{name},
939 cid => $rcid,
940 };
941
942 sync_quarantine_db($ldb, $masterdb, $fake_ni);
943
944 sync_statistic_db ($ldb, $masterdb, $fake_ni);
945
946 open(my $fh, ">>", $done_marker);
947 }
948 }
949
950
951 1;