]> git.proxmox.com Git - pmg-api.git/blame - src/PMG/Cluster.pm
add /etc/pmg/pbs to cluster-sync
[pmg-api.git] / src / PMG / Cluster.pm
CommitLineData
0854fb22
DM
1package PMG::Cluster;
2
3use strict;
4use warnings;
45e68618 5use Data::Dumper;
0854fb22 6use Socket;
cfdf6608 7use File::Path;
9430b6d4 8use Time::HiRes qw (gettimeofday tv_interval);
45e68618 9
a7c7cad7 10use PVE::SafeSyslog;
0854fb22
DM
11use PVE::Tools;
12use PVE::INotify;
0757859a 13use PVE::APIClient::LWP;
b0d26b8f 14use PVE::Network;
0854fb22 15
1fb2ab76 16use PMG::Utils;
a7c7cad7 17use PMG::Config;
9f67f5b3 18use PMG::ClusterConfig;
db303db4
DM
19use PMG::RuleDB;
20use PMG::RuleCache;
9430b6d4 21use PMG::MailQueue;
0757859a 22use PMG::Fetchmail;
0854fb22 23
8737f93a
DM
24sub remote_node_ip {
25 my ($nodename, $noerr) = @_;
26
d8782874 27 my $cinfo = PMG::ClusterConfig->new();
8737f93a 28
45e68618 29 foreach my $entry (values %{$cinfo->{ids}}) {
8737f93a
DM
30 if ($entry->{name} eq $nodename) {
31 my $ip = $entry->{ip};
32 return $ip if !wantarray;
33 my $family = PVE::Tools::get_host_address_family($ip);
34 return ($ip, $family);
35 }
36 }
37
38 # fallback: try to get IP by other means
b0d26b8f 39 return PVE::Network::get_ip_from_hostname($nodename, $noerr);
8737f93a
DM
40}
41
d2e43f9e
DM
42sub get_master_node {
43 my ($cinfo) = @_;
44
d8782874 45 $cinfo = PMG::ClusterConfig->new() if !$cinfo;
d2e43f9e
DM
46
47 return $cinfo->{master}->{name} if defined($cinfo->{master});
48
49 return 'localhost';
50}
51
cba17aeb
DM
52sub read_local_ssl_cert_fingerprint {
53 my $cert_path = "/etc/pmg/pmg-api.pem";
0854fb22 54
cba17aeb
DM
55 my $cert;
56 eval {
57 my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r');
58 $cert = Net::SSLeay::PEM_read_bio_X509($bio);
59 Net::SSLeay::BIO_free($bio);
60 };
61 if (my $err = $@) {
62 die "unable to read certificate '$cert_path' - $err\n";
63 }
0854fb22 64
cba17aeb
DM
65 if (!defined($cert)) {
66 die "unable to read certificate '$cert_path' - got empty value\n";
67 }
68
69 my $fp;
70 eval {
71 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
72 };
73 if (my $err = $@) {
74 die "unable to get fingerprint for '$cert_path' - $err\n";
75 }
0854fb22 76
cba17aeb
DM
77 if (!defined($fp) || $fp eq '') {
78 die "unable to get fingerprint for '$cert_path' - got empty value\n";
79 }
0854fb22 80
cba17aeb
DM
81 return $fp;
82}
0854fb22 83
cba17aeb
DM
84my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub';
85my $rootrsakey_fn = '/root/.ssh/id_rsa';
86my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub';
0854fb22 87
cba17aeb
DM
88sub read_local_cluster_info {
89
90 my $res = {};
91
92 my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn);
93 $hostrsapubkey =~ s/^.*ssh-rsa\s+//i;
94 $hostrsapubkey =~ s/\s+root\@\S+\s*$//i;
95
96 die "unable to parse ${hostrsapubkey_fn}\n"
97 if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
0854fb22
DM
98
99 my $nodename = PVE::INotify::nodename();
100
cba17aeb 101 $res->{name} = $nodename;
0854fb22 102
b0d26b8f 103 $res->{ip} = PVE::Network::get_ip_from_hostname($nodename);
0854fb22 104
cba17aeb 105 $res->{hostrsapubkey} = $hostrsapubkey;
0854fb22 106
cba17aeb
DM
107 if (! -f $rootrsapubkey_fn) {
108 unlink $rootrsakey_fn;
109 my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048',
110 '-f', $rootrsakey_fn];
1fb2ab76 111 PMG::Utils::run_silent_cmd($cmd);
cba17aeb
DM
112 }
113
114 my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn);
115 $rootrsapubkey =~ s/^.*ssh-rsa\s+//i;
116 $rootrsapubkey =~ s/\s+root\@\S+\s*$//i;
117
118 die "unable to parse ${rootrsapubkey_fn}\n"
119 if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/;
120
121 $res->{rootrsapubkey} = $rootrsapubkey;
122
123 $res->{fingerprint} = read_local_ssl_cert_fingerprint();
124
125 return $res;
126}
127
128# X509 Certificate cache helper
129
130my $cert_cache_nodes = {};
131my $cert_cache_timestamp = time();
132my $cert_cache_fingerprints = {};
0854fb22 133
cba17aeb 134sub update_cert_cache {
0854fb22 135
cba17aeb
DM
136 $cert_cache_timestamp = time();
137
138 $cert_cache_fingerprints = {};
139 $cert_cache_nodes = {};
140
d8782874 141 my $cinfo = PMG::ClusterConfig->new();
cba17aeb
DM
142
143 foreach my $entry (values %{$cinfo->{ids}}) {
144 my $node = $entry->{name};
145 my $fp = $entry->{fingerprint};
146 if ($node && $fp) {
147 $cert_cache_fingerprints->{$fp} = 1;
148 $cert_cache_nodes->{$node} = $fp;
0854fb22
DM
149 }
150 }
151}
152
153# load and cache cert fingerprint once
154sub initialize_cert_cache {
155 my ($node) = @_;
156
cba17aeb 157 update_cert_cache()
0854fb22
DM
158 if defined($node) && !defined($cert_cache_nodes->{$node});
159}
160
161sub check_cert_fingerprint {
162 my ($cert) = @_;
163
164 # clear cache every 30 minutes at least
cba17aeb 165 update_cert_cache() if time() - $cert_cache_timestamp >= 60*30;
0854fb22
DM
166
167 # get fingerprint of server certificate
168 my $fp;
169 eval {
170 $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256');
171 };
172 return 0 if $@ || !defined($fp) || $fp eq ''; # error
173
174 my $check = sub {
175 for my $expected (keys %$cert_cache_fingerprints) {
176 return 1 if $fp eq $expected;
177 }
178 return 0;
179 };
180
cba17aeb 181 return 1 if $check->();
0854fb22
DM
182
183 # clear cache and retry at most once every minute
184 if (time() - $cert_cache_timestamp >= 60) {
185 syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache");
186 update_cert_cache();
cba17aeb 187 return $check->();
0854fb22
DM
188 }
189
190 return 0;
191}
192
58072364
DM
193my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2";
194my $rootsshauthkeys = "/root/.ssh/authorized_keys";
195my $ssh_rsa_id = "/root/.ssh/id_rsa.pub";
196
197sub update_ssh_keys {
198 my ($cinfo) = @_;
199
809dd92a 200 my $old = '';
58072364 201 my $data = '';
809dd92a 202
58072364
DM
203 foreach my $node (values %{$cinfo->{ids}}) {
204 $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n";
205 $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n";
206 }
207
809dd92a
DM
208 $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024)
209 if -f $sshglobalknownhosts;
210
211 PVE::Tools::file_set_contents($sshglobalknownhosts, $data)
212 if $old ne $data;
58072364
DM
213
214 $data = '';
809dd92a 215 $old = '';
58072364
DM
216
217 # always add ourself
218 if (-f $ssh_rsa_id) {
219 my $pub = PVE::Tools::file_get_contents($ssh_rsa_id);
220 chomp($pub);
221 $data .= "$pub\n";
222 }
223
224 foreach my $node (values %{$cinfo->{ids}}) {
225 $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n";
226 }
227
228 if (-f $rootsshauthkeys) {
a7c7cad7
DM
229 my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024);
230 chomp($mykey);
231 $data .= "$mykey\n";
58072364
DM
232 }
233
234 my $newdata = "";
235 my $vhash = {};
236 my @lines = split(/\n/, $data);
237 foreach my $line (@lines) {
238 if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) {
239 next if $vhash->{$3}++;
240 }
241 $newdata .= "$line\n";
242 }
243
809dd92a
DM
244 $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024)
245 if -f $rootsshauthkeys;
246
247 PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600)
248 if $old ne $newdata;
58072364
DM
249}
250
a7c7cad7
DM
251my $cfgdir = '/etc/pmg';
252my $syncdir = "$cfgdir/master";
253
254my $cond_commit_synced_file = sub {
255 my ($filename, $dstfn) = @_;
256
257 $dstfn = "$cfgdir/$filename" if !defined($dstfn);
258 my $srcfn = "$syncdir/$filename";
259
260 if (! -f $srcfn) {
261 unlink $dstfn;
262 return;
263 }
264
265 my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024);
266
267 if (-f $dstfn) {
268 my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024);
269 return 0 if $new eq $old;
270 }
271
1dc946d4
DM
272 # set mtime (touch) to avoid time drift problems
273 utime(undef, undef, $srcfn);
274
a7c7cad7
DM
275 rename($srcfn, $dstfn) ||
276 die "cond_rename_file '$filename' failed - $!\n";
277
278 print STDERR "updated $dstfn\n";
279
280 return 1;
281};
282
c9dae0df
DM
283my $rsync_command = sub {
284 my ($host_key_alias, @args) = @_;
285
286 my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes';
287 $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias;
288
289 my $cmd = ['rsync', $ssh_cmd, '-q', @args];
290
291 return $cmd;
292};
293
294sub sync_quarantine_files {
89eefd36 295 my ($host_ip, $host_name, $flistname, $rcid) = @_;
c9dae0df 296
9430b6d4
DM
297 my $spooldir = $PMG::MailQueue::spooldir;
298
89eefd36
DM
299 mkdir "$spooldir/cluster/";
300 my $syncdir = "$spooldir/cluster/$rcid";
301 mkdir $syncdir;
302
c9dae0df 303 my $cmd = $rsync_command->(
4daf3a35 304 $host_name, '--timeout', '10', "[${host_ip}]:$spooldir", $spooldir,
c9dae0df
DM
305 '--files-from', $flistname);
306
9430b6d4 307 PVE::Tools::run_command($cmd);
c9dae0df
DM
308}
309
310sub sync_spooldir {
311 my ($host_ip, $host_name, $rcid) = @_;
312
9430b6d4
DM
313 my $spooldir = $PMG::MailQueue::spooldir;
314
c9dae0df
DM
315 mkdir "$spooldir/cluster/";
316 my $syncdir = "$spooldir/cluster/$rcid";
317 mkdir $syncdir;
318
319 my $cmd = $rsync_command->(
4daf3a35 320 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir/", $syncdir);
c9dae0df
DM
321
322 foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) {
323 push @$cmd, '--include', $incl;
324 }
325
326 push @$cmd, '--exclude', '*';
327
328 PVE::Tools::run_command($cmd);
329}
330
331sub sync_master_quar {
332 my ($host_ip, $host_name) = @_;
333
9430b6d4
DM
334 my $spooldir = $PMG::MailQueue::spooldir;
335
c9dae0df
DM
336 my $syncdir = "$spooldir/cluster/";
337 mkdir $syncdir;
338
339 my $cmd = $rsync_command->(
4daf3a35 340 $host_name, '-aq', '--timeout', '10', "[${host_ip}]:$syncdir", $syncdir);
c9dae0df
DM
341
342 PVE::Tools::run_command($cmd);
343}
344
58072364 345sub sync_config_from_master {
809ae8f4 346 my ($master_name, $master_ip, $noreload) = @_;
58072364 347
58072364 348 mkdir $syncdir;
a7c7cad7 349 File::Path::remove_tree($syncdir, {keep_root => 1});
58072364 350
a7c7cad7
DM
351 my $sa_conf_dir = "/etc/mail/spamassassin";
352 my $sa_custom_cf = "custom.cf";
957d0882 353 my $sa_rules_cf = "pmg-scores.cf";
58072364 354
c9dae0df 355 my $cmd = $rsync_command->(
f4b7112b 356 $master_name, '-aq',
957d0882 357 "[${master_ip}]:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf} ${sa_conf_dir}/${sa_rules_cf}",
c9dae0df 358 "$syncdir/",
f4b7112b 359 '--exclude', 'master/',
c9dae0df
DM
360 '--exclude', '*~',
361 '--exclude', '*.db',
362 '--exclude', 'pmg-api.pem',
363 '--exclude', 'pmg-tls.pem',
364 );
58072364
DM
365
366 my $errmsg = "syncing master configuration from '${master_ip}' failed";
367 PVE::Tools::run_command($cmd, errmsg => $errmsg);
a7c7cad7
DM
368
369 # verify that the remote host is cluster master
370 open (my $fh, '<', "$syncdir/cluster.conf") ||
371 die "unable to open synced cluster.conf - $!\n";
a7c7cad7 372
809ae8f4
DM
373 my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh);
374
375 if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) {
a7c7cad7
DM
376 die "host '$master_ip' is not cluster master\n";
377 }
378
809ae8f4
DM
379 my $role = $cinfo->{'local'}->{type} // '-';
380 die "local node '$cinfo->{local}->{name}' not part of cluster\n"
a7c7cad7
DM
381 if $role eq '-';
382
809ae8f4 383 die "local node '$cinfo->{local}->{name}' is new cluster master\n"
a7c7cad7
DM
384 if $role eq 'master';
385
a7c7cad7 386 $cond_commit_synced_file->('cluster.conf');
a7c7cad7 387
809dd92a
DM
388 update_ssh_keys($cinfo); # rewrite ssh keys
389
0757859a
DM
390 PMG::Fetchmail::update_fetchmail_default(0); # disable on slave
391
a7c7cad7
DM
392 my $files = [
393 'pmg-authkey.key',
394 'pmg-authkey.pub',
395 'pmg-csrf.key',
396 'ldap.conf',
397 'user.conf',
7cac3e28
DM
398 'domains',
399 'mynetworks',
400 'transport',
959aaeba 401 'tls_policy',
fd6feef4 402 'fetchmailrc',
a7c7cad7
DM
403 ];
404
405 foreach my $filename (@$files) {
406 $cond_commit_synced_file->($filename);
407 }
408
06cebf83
SI
409 my $dirs = [
410 'templates',
411 'dkim',
2cfdd9a1 412 'pbs',
06cebf83
SI
413 ];
414
415 foreach my $dir (@$dirs) {
416 my $srcdir = "$syncdir/$dir";
417
418 if ( -d $srcdir ) {
419 my $cmd = ['rsync', '-aq', '--delete-after', "$srcdir/", "$cfgdir/$dir"];
420 PVE::Tools::run_command($cmd);
421 }
422
423 }
0757859a 424
a7c7cad7
DM
425 my $force_restart = {};
426
957d0882
DC
427 for my $file (($sa_custom_cf, $sa_rules_cf)) {
428 if ($cond_commit_synced_file->($file, "${sa_conf_dir}/${file}")) {
429 $force_restart->{'pmg-smtp-filter'} = 1;
430 }
a7c7cad7
DM
431 }
432
433 $cond_commit_synced_file->('pmg.conf');
f4b7112b 434
5314a3a9 435 return $force_restart;
58072364
DM
436}
437
db303db4
DM
438sub sync_ruledb_from_master {
439 my ($ldb, $rdb, $ni, $ticket) = @_;
440
441 my $ruledb = PMG::RuleDB->new($ldb);
442 my $rulecache = PMG::RuleCache->new($ruledb);
443
444 my $conn = PVE::APIClient::LWP->new(
445 ticket => $ticket,
446 cookie_name => 'PMGAuthCookie',
447 host => $ni->{ip},
448 cached_fingerprints => {
449 $ni->{fingerprint} => 1,
450 });
451
452 my $digest = $conn->get("/config/ruledb/digest", {});
453
454 return if $digest eq $rulecache->{digest}; # no changes
455
456 syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'");
457
458 eval {
459 $ldb->begin_work;
460
461 $ldb->do("DELETE FROM Rule");
462 $ldb->do("DELETE FROM RuleGroup");
463 $ldb->do("DELETE FROM ObjectGroup");
464 $ldb->do("DELETE FROM Object");
465 $ldb->do("DELETE FROM Attribut");
466
467 eval {
468 $rdb->begin_work;
469
470 # read a consistent snapshot
471 $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
472
473 PMG::DBTools::copy_table($ldb, $rdb, "Rule");
474 PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup");
475 PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup");
476 PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value');
477 PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value');
478 };
479
480 $rdb->rollback; # end transaction
481
482 die $@ if $@;
483
484 # update sequences
485
486 $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule");
487 $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object");
488 $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup");
489
490 $ldb->commit;
491 };
492 if (my $err = $@) {
493 $ldb->rollback;
494 die $err;
495 }
496
e29bdb0a
SI
497 PMG::DBTools::reload_ruledb();
498
db303db4
DM
499 syslog('info', "finished rule database sync from host '$ni->{ip}'");
500}
501
9430b6d4
DM
502sub sync_quarantine_db {
503 my ($ldb, $rdb, $ni, $rsynctime_ref) = @_;
504
505 my $rcid = $ni->{cid};
506
507 my $maxmails = 100000;
508
509 my $mscount = 0;
510
511 my $ctime = PMG::DBTools::get_remote_time($rdb);
512
513 my $maxcount = 1000;
514
515 my $count;
516
517 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef);
518
519 do { # get new values
520
521 $count = 0;
522
523 my $flistname = "/tmp/quarantinefilelist.$$";
524
525 eval {
526 $ldb->begin_work;
527
528 open(my $flistfh, '>', $flistname) ||
529 die "unable to open file '$flistname' - $!\n";
530
531 my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore');
532
533 # sync CMailStore
534
535 my $sth = $rdb->prepare(
536 "SELECT * from CMailstore WHERE cid = ? AND rid > ? " .
537 "ORDER BY cid,rid LIMIT ?");
538 $sth->execute($rcid, $lastid, $maxcount);
539
540 my $maxid;
541 my $callback = sub {
542 my $ref = shift;
543 $maxid = $ref->{rid};
89eefd36
DM
544 my $filename = $ref->{file};
545 # skip files generated before cluster was created
546 return if $filename !~ m!^cluster/!;
547 print $flistfh "$filename\n";
9430b6d4
DM
548 };
549
550 my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)];
551 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback);
552
553 close($flistfh);
554
555 my $starttime = [ gettimeofday() ];
89eefd36 556 sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname, $rcid);
9430b6d4
DM
557 $$rsynctime_ref += tv_interval($starttime);
558
559 if ($maxid) {
560 # sync CMSReceivers
561
562 $sth = $rdb->prepare(
563 "SELECT * from CMSReceivers WHERE " .
564 "CMailStore_CID = ? AND CMailStore_RID > ? " .
565 "AND CMailStore_RID <= ?");
566 $sth->execute($rcid, $lastid, $maxid);
567
666b5e8f 568 $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)];
9430b6d4
DM
569 PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs);
570
571 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid);
572 }
573
574 $ldb->commit;
575 };
576 my $err = $@;
577
578 unlink $flistname;
579
580 if ($err) {
581 $ldb->rollback;
582 die $err;
583 }
584
585 $mscount += $count;
586
d4c2f646 587 } while (($count >= $maxcount) && ($mscount < $maxmails));
9430b6d4
DM
588
589 PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef);
590
591 eval { # synchronize status updates
592 $ldb->begin_work;
593
594 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers');
595
596 my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'");
597 $sth->execute($lastmt);
598
599 my $update_sth = $ldb->prepare(
600 "UPDATE CMSReceivers SET status = ? WHERE " .
666b5e8f 601 "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?");
9430b6d4
DM
602 while (my $ref = $sth->fetchrow_hashref()) {
603 $update_sth->execute($ref->{status}, $ref->{cmailstore_cid},
666b5e8f 604 $ref->{cmailstore_rid}, $ref->{ticketid});
9430b6d4
DM
605 }
606
607 PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime);
608
609 $ldb->commit;
610 };
611 if (my $err = $@) {
612 $ldb->rollback;
613 die $err;
614 }
615
616 return $mscount;
617}
618
619sub sync_statistic_db {
620 my ($ldb, $rdb, $ni) = @_;
621
622 my $rcid = $ni->{cid};
623
624 my $maxmails = 100000;
625
626 my $mscount = 0;
627
628 my $maxcount = 1000;
629
630 my $count;
631
632 PMG::DBTools::create_clusterinfo_default(
633 $ldb, $rcid, 'lastid_CStatistic', -1, undef);
634
635 do { # get new values
636
637 $count = 0;
638
639 eval {
640 $ldb->begin_work;
641
642 my $lastid = PMG::DBTools::read_int_clusterinfo(
643 $ldb, $rcid, 'lastid_CStatistic');
644
645 # sync CStatistic
646
647 my $sth = $rdb->prepare(
648 "SELECT * from CStatistic " .
649 "WHERE cid = ? AND rid > ? " .
650 "ORDER BY cid, rid LIMIT ?");
651 $sth->execute($rcid, $lastid, $maxcount);
652
653 my $maxid;
654 my $callback = sub {
655 my $ref = shift;
656 $maxid = $ref->{rid};
657 };
658
659 my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)];
660 $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback);
661
662 if ($maxid) {
663 # sync CReceivers
664
665 $sth = $rdb->prepare(
666 "SELECT * from CReceivers WHERE " .
667 "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?");
668 $sth->execute($rcid, $lastid, $maxid);
669
670 $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)];
671 PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs);
9430b6d4
DM
672 }
673
64b7a2ea
DM
674 PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid);
675
9430b6d4
DM
676 $ldb->commit;
677 };
678 if (my $err = $@) {
679 $ldb->rollback;
680 die $err;
681 }
682
683 $mscount += $count;
684
d4c2f646 685 } while (($count >= $maxcount) && ($mscount < $maxmails));
e86b85a3
DM
686
687 return $mscount;
9430b6d4
DM
688}
689
986eec31 690my $sync_generic_mtime_db = sub {
9430b6d4
DM
691 my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_;
692
9430b6d4
DM
693 my $ctime = PMG::DBTools::get_remote_time($rdb);
694
695 PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef);
696
697 my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table");
698
4c93256a 699 my $sql_cmd = $selectfunc->($ctime, $lastmt);
9430b6d4 700
4c93256a 701 my $sth = $rdb->prepare($sql_cmd);
9430b6d4
DM
702
703 $sth->execute();
704
986eec31
DM
705 my $updates = 0;
706
4c93256a
DM
707 eval {
708 # use transaction to speedup things
709 my $max = 1000; # UPDATE MAX ENTRIES AT ONCE
4c93256a
DM
710 my $count = 0;
711 while (my $ref = $sth->fetchrow_hashref()) {
9d5bdd2a
DM
712 $ldb->begin_work if !$count;
713 $mergefunc->($ref);
4c93256a
DM
714 if (++$count >= $max) {
715 $count = 0;
716 $ldb->commit;
9430b6d4 717 }
986eec31 718 $updates++;
9430b6d4 719 }
9d5bdd2a
DM
720
721 $ldb->commit if $count;
9430b6d4 722 };
4c93256a
DM
723 if (my $err = $@) {
724 $ldb->rollback;
725 die $err;
9430b6d4
DM
726 }
727
9430b6d4
DM
728 PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime);
729
986eec31
DM
730 return $updates;
731};
9430b6d4 732
5e1408fd
DM
733sub sync_localstat_db {
734 my ($dbh, $rdb, $ni) = @_;
735
736 my $rcid = $ni->{cid};
737
738 my $selectfunc = sub {
739 my ($ctime, $lastmt) = @_;
740 return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid";
741 };
742
743 my $merge_sth = $dbh->prepare(
ebd19c79
DM
744 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' .
745 'VALUES (?, ?, ?, ?, ?) ' .
5e1408fd 746 'ON CONFLICT (Time, CID) DO UPDATE SET ' .
ebd19c79 747 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime');
5e1408fd
DM
748
749 my $mergefunc = sub {
750 my ($ref) = @_;
751
ebd19c79 752 $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime});
5e1408fd
DM
753 };
754
755 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc);
756}
757
9430b6d4
DM
758sub sync_greylist_db {
759 my ($dbh, $rdb, $ni) = @_;
760
761 my $selectfunc = sub {
762 my ($ctime, $lastmt) = @_;
763 return "SELECT * from CGreylist WHERE extime >= $ctime AND " .
764 "mtime >= $lastmt AND CID != 0";
765 };
766
f61d5489
SI
767 # FIXME: drop Host column with PMG 7.0
768 my $merge_sth = $dbh->prepare(PMG::DBTools::cgreylist_merge_sql());
9430b6d4 769 my $mergefunc = sub {
986eec31 770 my ($ref) = @_;
9430b6d4 771
f61d5489
SI
772 my $ipnet = $ref->{ipnet};
773 $ipnet .= '.0/24' if $ipnet !~ /\/\d+$/;
f413f920 774 $merge_sth->execute(
f61d5489 775 $ipnet, 0, $ref->{sender}, $ref->{receiver},
f413f920
DM
776 $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay},
777 $ref->{blocked}, $ref->{passed}, 0, $ref->{cid});
9430b6d4
DM
778 };
779
986eec31 780 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc);
9430b6d4
DM
781}
782
783sub sync_userprefs_db {
784 my ($dbh, $rdb, $ni) = @_;
785
786 my $selectfunc = sub {
787 my ($ctime, $lastmt) = @_;
788
789 return "SELECT * from UserPrefs WHERE mtime >= $lastmt";
790 };
791
7d13157e 792 my $merge_sth = $dbh->prepare(
471b05ed 793 "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " .
286bc590 794 'VALUES (?, ?, ?, ?) ' .
471b05ed 795 'ON CONFLICT (PMail, Name) DO UPDATE SET ' .
dbdf1298 796 # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified
3a9ffd85 797 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' .
0527ea1a 798 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END');
471b05ed 799
9430b6d4 800 my $mergefunc = sub {
986eec31 801 my ($ref) = @_;
9430b6d4 802
286bc590 803 $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}, $ref->{mtime});
9430b6d4
DM
804 };
805
986eec31 806 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc);
9430b6d4
DM
807}
808
809sub sync_domainstat_db {
810 my ($dbh, $rdb, $ni) = @_;
811
812 my $selectfunc = sub {
813 my ($ctime, $lastmt) = @_;
814 return "SELECT * from DomainStat WHERE mtime >= $lastmt";
815 };
816
1c7ea32c
DM
817 my $merge_sth = $dbh->prepare(
818 'INSERT INTO Domainstat ' .
819 '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
820 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' .
821 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
822 'ON CONFLICT (Time, Domain) DO UPDATE SET ' .
823 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
824 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
825 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
826 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
827 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
828 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
829
9430b6d4 830 my $mergefunc = sub {
986eec31 831 my ($ref) = @_;
9430b6d4 832
1c7ea32c
DM
833 $merge_sth->execute(
834 $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout},
835 $ref->{bytesin}, $ref->{bytesout},
836 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
837 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
838 };
839
986eec31 840 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc);
9430b6d4
DM
841}
842
843sub sync_dailystat_db {
844 my ($dbh, $rdb, $ni) = @_;
845
846 my $selectfunc = sub {
847 my ($ctime, $lastmt) = @_;
848 return "SELECT * from DailyStat WHERE mtime >= $lastmt";
8bf584ae
DM
849 };
850
851 my $merge_sth = $dbh->prepare(
852 'INSERT INTO DailyStat ' .
853 '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' .
854 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' .
855 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' .
856 'ON CONFLICT (Time) DO UPDATE SET ' .
857 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' .
858 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' .
859 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' .
860 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' .
861 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' .
862 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' .
863 'RBLCount = excluded.RBLCount, ' .
864 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime');
9430b6d4
DM
865
866 my $mergefunc = sub {
986eec31 867 my ($ref) = @_;
9430b6d4 868
dbdf1298 869 $merge_sth->execute(
8bf584ae
DM
870 $ref->{time}, $ref->{countin}, $ref->{countout},
871 $ref->{bytesin}, $ref->{bytesout},
872 $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout},
873 $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount},
874 $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime});
9430b6d4
DM
875 };
876
986eec31 877 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc);
9430b6d4
DM
878}
879
880sub sync_virusinfo_db {
881 my ($dbh, $rdb, $ni) = @_;
882
883 my $selectfunc = sub {
884 my ($ctime, $lastmt) = @_;
885 return "SELECT * from VirusInfo WHERE mtime >= $lastmt";
886 };
887
80615ad6
DM
888 my $merge_sth = $dbh->prepare(
889 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' .
890 'VALUES (?,?,?,?) ' .
891 'ON CONFLICT (Time,Name) DO UPDATE SET ' .
892 'Count = excluded.Count , MTime = excluded.MTime');
893
9430b6d4 894 my $mergefunc = sub {
986eec31 895 my ($ref) = @_;
9430b6d4 896
80615ad6 897 $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime});
9430b6d4
DM
898 };
899
986eec31 900 return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc);
9430b6d4
DM
901}
902
903sub sync_deleted_nodes_from_master {
904 my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_;
905
906 my $rsynctime = 0;
907
908 my $cid_hash = {}; # fast lookup
5c6e6eeb 909 foreach my $ni (values %{$cinfo->{ids}}) {
9430b6d4
DM
910 $cid_hash->{$ni->{cid}} = $ni;
911 }
912
913 my $spooldir = $PMG::MailQueue::spooldir;
914
5c6e6eeb
DM
915 my $maxcid = $cinfo->{master}->{maxcid} // 0;
916
917 for (my $rcid = 1; $rcid <= $maxcid; $rcid++) {
9430b6d4
DM
918 next if $cid_hash->{$rcid};
919
920 my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node";
921
922 next if -f $done_marker; # already synced
923
924 syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'");
925
926 my $starttime = [ gettimeofday() ];
927 sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid);
928 $$rsynctime_ref += tv_interval($starttime);
929
930 my $fake_ni = {
931 ip => $masterni->{ip},
932 name => $masterni->{name},
933 cid => $rcid,
934 };
935
936 sync_quarantine_db($ldb, $masterdb, $fake_ni);
937
938 sync_statistic_db ($ldb, $masterdb, $fake_ni);
939
940 open(my $fh, ">>", $done_marker);
941 }
942}
943
944
0854fb22 9451;