]>
Commit | Line | Data |
---|---|---|
0854fb22 DM |
1 | package PMG::Cluster; |
2 | ||
3 | use strict; | |
4 | use warnings; | |
45e68618 | 5 | use Data::Dumper; |
0854fb22 | 6 | use Socket; |
cfdf6608 | 7 | use File::Path; |
9430b6d4 | 8 | use Time::HiRes qw (gettimeofday tv_interval); |
45e68618 | 9 | |
a7c7cad7 | 10 | use PVE::SafeSyslog; |
0854fb22 DM |
11 | use PVE::Tools; |
12 | use PVE::INotify; | |
13 | ||
1fb2ab76 | 14 | use PMG::Utils; |
a7c7cad7 | 15 | use PMG::Config; |
9f67f5b3 | 16 | use PMG::ClusterConfig; |
db303db4 DM |
17 | use PMG::RuleDB; |
18 | use PMG::RuleCache; | |
9430b6d4 | 19 | use PMG::MailQueue; |
db303db4 | 20 | use PVE::APIClient::LWP; |
0854fb22 | 21 | |
8737f93a DM |
22 | sub remote_node_ip { |
23 | my ($nodename, $noerr) = @_; | |
24 | ||
d8782874 | 25 | my $cinfo = PMG::ClusterConfig->new(); |
8737f93a | 26 | |
45e68618 | 27 | foreach my $entry (values %{$cinfo->{ids}}) { |
8737f93a DM |
28 | if ($entry->{name} eq $nodename) { |
29 | my $ip = $entry->{ip}; | |
30 | return $ip if !wantarray; | |
31 | my $family = PVE::Tools::get_host_address_family($ip); | |
32 | return ($ip, $family); | |
33 | } | |
34 | } | |
35 | ||
36 | # fallback: try to get IP by other means | |
9f67f5b3 | 37 | return PMG::Utils::lookup_node_ip($nodename, $noerr); |
8737f93a DM |
38 | } |
39 | ||
d2e43f9e DM |
40 | sub get_master_node { |
41 | my ($cinfo) = @_; | |
42 | ||
d8782874 | 43 | $cinfo = PMG::ClusterConfig->new() if !$cinfo; |
d2e43f9e DM |
44 | |
45 | return $cinfo->{master}->{name} if defined($cinfo->{master}); | |
46 | ||
47 | return 'localhost'; | |
48 | } | |
49 | ||
cba17aeb DM |
50 | sub read_local_ssl_cert_fingerprint { |
51 | my $cert_path = "/etc/pmg/pmg-api.pem"; | |
0854fb22 | 52 | |
cba17aeb DM |
53 | my $cert; |
54 | eval { | |
55 | my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r'); | |
56 | $cert = Net::SSLeay::PEM_read_bio_X509($bio); | |
57 | Net::SSLeay::BIO_free($bio); | |
58 | }; | |
59 | if (my $err = $@) { | |
60 | die "unable to read certificate '$cert_path' - $err\n"; | |
61 | } | |
0854fb22 | 62 | |
cba17aeb DM |
63 | if (!defined($cert)) { |
64 | die "unable to read certificate '$cert_path' - got empty value\n"; | |
65 | } | |
66 | ||
67 | my $fp; | |
68 | eval { | |
69 | $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256'); | |
70 | }; | |
71 | if (my $err = $@) { | |
72 | die "unable to get fingerprint for '$cert_path' - $err\n"; | |
73 | } | |
0854fb22 | 74 | |
cba17aeb DM |
75 | if (!defined($fp) || $fp eq '') { |
76 | die "unable to get fingerprint for '$cert_path' - got empty value\n"; | |
77 | } | |
0854fb22 | 78 | |
cba17aeb DM |
79 | return $fp; |
80 | } | |
0854fb22 | 81 | |
cba17aeb DM |
82 | my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub'; |
83 | my $rootrsakey_fn = '/root/.ssh/id_rsa'; | |
84 | my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub'; | |
0854fb22 | 85 | |
cba17aeb DM |
86 | sub read_local_cluster_info { |
87 | ||
88 | my $res = {}; | |
89 | ||
90 | my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn); | |
91 | $hostrsapubkey =~ s/^.*ssh-rsa\s+//i; | |
92 | $hostrsapubkey =~ s/\s+root\@\S+\s*$//i; | |
93 | ||
94 | die "unable to parse ${hostrsapubkey_fn}\n" | |
95 | if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/; | |
0854fb22 DM |
96 | |
97 | my $nodename = PVE::INotify::nodename(); | |
98 | ||
cba17aeb | 99 | $res->{name} = $nodename; |
0854fb22 | 100 | |
cba17aeb | 101 | $res->{ip} = PMG::Utils::lookup_node_ip($nodename); |
0854fb22 | 102 | |
cba17aeb | 103 | $res->{hostrsapubkey} = $hostrsapubkey; |
0854fb22 | 104 | |
cba17aeb DM |
105 | if (! -f $rootrsapubkey_fn) { |
106 | unlink $rootrsakey_fn; | |
107 | my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048', | |
108 | '-f', $rootrsakey_fn]; | |
1fb2ab76 | 109 | PMG::Utils::run_silent_cmd($cmd); |
cba17aeb DM |
110 | } |
111 | ||
112 | my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn); | |
113 | $rootrsapubkey =~ s/^.*ssh-rsa\s+//i; | |
114 | $rootrsapubkey =~ s/\s+root\@\S+\s*$//i; | |
115 | ||
116 | die "unable to parse ${rootrsapubkey_fn}\n" | |
117 | if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/; | |
118 | ||
119 | $res->{rootrsapubkey} = $rootrsapubkey; | |
120 | ||
121 | $res->{fingerprint} = read_local_ssl_cert_fingerprint(); | |
122 | ||
123 | return $res; | |
124 | } | |
125 | ||
126 | # X509 Certificate cache helper | |
127 | ||
128 | my $cert_cache_nodes = {}; | |
129 | my $cert_cache_timestamp = time(); | |
130 | my $cert_cache_fingerprints = {}; | |
0854fb22 | 131 | |
cba17aeb | 132 | sub update_cert_cache { |
0854fb22 | 133 | |
cba17aeb DM |
134 | $cert_cache_timestamp = time(); |
135 | ||
136 | $cert_cache_fingerprints = {}; | |
137 | $cert_cache_nodes = {}; | |
138 | ||
d8782874 | 139 | my $cinfo = PMG::ClusterConfig->new(); |
cba17aeb DM |
140 | |
141 | foreach my $entry (values %{$cinfo->{ids}}) { | |
142 | my $node = $entry->{name}; | |
143 | my $fp = $entry->{fingerprint}; | |
144 | if ($node && $fp) { | |
145 | $cert_cache_fingerprints->{$fp} = 1; | |
146 | $cert_cache_nodes->{$node} = $fp; | |
0854fb22 DM |
147 | } |
148 | } | |
149 | } | |
150 | ||
151 | # load and cache cert fingerprint once | |
152 | sub initialize_cert_cache { | |
153 | my ($node) = @_; | |
154 | ||
cba17aeb | 155 | update_cert_cache() |
0854fb22 DM |
156 | if defined($node) && !defined($cert_cache_nodes->{$node}); |
157 | } | |
158 | ||
159 | sub check_cert_fingerprint { | |
160 | my ($cert) = @_; | |
161 | ||
162 | # clear cache every 30 minutes at least | |
cba17aeb | 163 | update_cert_cache() if time() - $cert_cache_timestamp >= 60*30; |
0854fb22 DM |
164 | |
165 | # get fingerprint of server certificate | |
166 | my $fp; | |
167 | eval { | |
168 | $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256'); | |
169 | }; | |
170 | return 0 if $@ || !defined($fp) || $fp eq ''; # error | |
171 | ||
172 | my $check = sub { | |
173 | for my $expected (keys %$cert_cache_fingerprints) { | |
174 | return 1 if $fp eq $expected; | |
175 | } | |
176 | return 0; | |
177 | }; | |
178 | ||
cba17aeb | 179 | return 1 if $check->(); |
0854fb22 DM |
180 | |
181 | # clear cache and retry at most once every minute | |
182 | if (time() - $cert_cache_timestamp >= 60) { | |
183 | syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache"); | |
184 | update_cert_cache(); | |
cba17aeb | 185 | return $check->(); |
0854fb22 DM |
186 | } |
187 | ||
188 | return 0; | |
189 | } | |
190 | ||
58072364 DM |
191 | my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2"; |
192 | my $rootsshauthkeys = "/root/.ssh/authorized_keys"; | |
193 | my $ssh_rsa_id = "/root/.ssh/id_rsa.pub"; | |
194 | ||
195 | sub update_ssh_keys { | |
196 | my ($cinfo) = @_; | |
197 | ||
198 | my $data = ''; | |
199 | foreach my $node (values %{$cinfo->{ids}}) { | |
200 | $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n"; | |
201 | $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n"; | |
202 | } | |
203 | ||
204 | PVE::Tools::file_set_contents($sshglobalknownhosts, $data); | |
205 | ||
206 | $data = ''; | |
207 | ||
208 | # always add ourself | |
209 | if (-f $ssh_rsa_id) { | |
210 | my $pub = PVE::Tools::file_get_contents($ssh_rsa_id); | |
211 | chomp($pub); | |
212 | $data .= "$pub\n"; | |
213 | } | |
214 | ||
215 | foreach my $node (values %{$cinfo->{ids}}) { | |
216 | $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n"; | |
217 | } | |
218 | ||
219 | if (-f $rootsshauthkeys) { | |
a7c7cad7 DM |
220 | my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024); |
221 | chomp($mykey); | |
222 | $data .= "$mykey\n"; | |
58072364 DM |
223 | } |
224 | ||
225 | my $newdata = ""; | |
226 | my $vhash = {}; | |
227 | my @lines = split(/\n/, $data); | |
228 | foreach my $line (@lines) { | |
229 | if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) { | |
230 | next if $vhash->{$3}++; | |
231 | } | |
232 | $newdata .= "$line\n"; | |
233 | } | |
234 | ||
235 | PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600); | |
236 | } | |
237 | ||
a7c7cad7 DM |
238 | my $cfgdir = '/etc/pmg'; |
239 | my $syncdir = "$cfgdir/master"; | |
240 | ||
241 | my $cond_commit_synced_file = sub { | |
242 | my ($filename, $dstfn) = @_; | |
243 | ||
244 | $dstfn = "$cfgdir/$filename" if !defined($dstfn); | |
245 | my $srcfn = "$syncdir/$filename"; | |
246 | ||
247 | if (! -f $srcfn) { | |
248 | unlink $dstfn; | |
249 | return; | |
250 | } | |
251 | ||
252 | my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024); | |
253 | ||
254 | if (-f $dstfn) { | |
255 | my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024); | |
256 | return 0 if $new eq $old; | |
257 | } | |
258 | ||
259 | rename($srcfn, $dstfn) || | |
260 | die "cond_rename_file '$filename' failed - $!\n"; | |
261 | ||
262 | print STDERR "updated $dstfn\n"; | |
263 | ||
264 | return 1; | |
265 | }; | |
266 | ||
c9dae0df DM |
267 | my $rsync_command = sub { |
268 | my ($host_key_alias, @args) = @_; | |
269 | ||
270 | my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes'; | |
271 | $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias; | |
272 | ||
273 | my $cmd = ['rsync', $ssh_cmd, '-q', @args]; | |
274 | ||
275 | return $cmd; | |
276 | }; | |
277 | ||
278 | sub sync_quarantine_files { | |
279 | my ($host_ip, $host_name, $flistname) = @_; | |
280 | ||
9430b6d4 DM |
281 | my $spooldir = $PMG::MailQueue::spooldir; |
282 | ||
c9dae0df DM |
283 | my $cmd = $rsync_command->( |
284 | $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir, | |
285 | '--files-from', $flistname); | |
286 | ||
9430b6d4 | 287 | PVE::Tools::run_command($cmd); |
c9dae0df DM |
288 | } |
289 | ||
290 | sub sync_spooldir { | |
291 | my ($host_ip, $host_name, $rcid) = @_; | |
292 | ||
9430b6d4 DM |
293 | my $spooldir = $PMG::MailQueue::spooldir; |
294 | ||
c9dae0df DM |
295 | mkdir "$spooldir/cluster/"; |
296 | my $syncdir = "$spooldir/cluster/$rcid"; | |
297 | mkdir $syncdir; | |
298 | ||
299 | my $cmd = $rsync_command->( | |
300 | $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir); | |
301 | ||
302 | foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) { | |
303 | push @$cmd, '--include', $incl; | |
304 | } | |
305 | ||
306 | push @$cmd, '--exclude', '*'; | |
307 | ||
308 | PVE::Tools::run_command($cmd); | |
309 | } | |
310 | ||
311 | sub sync_master_quar { | |
312 | my ($host_ip, $host_name) = @_; | |
313 | ||
9430b6d4 DM |
314 | my $spooldir = $PMG::MailQueue::spooldir; |
315 | ||
c9dae0df DM |
316 | my $syncdir = "$spooldir/cluster/"; |
317 | mkdir $syncdir; | |
318 | ||
319 | my $cmd = $rsync_command->( | |
320 | $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir); | |
321 | ||
322 | PVE::Tools::run_command($cmd); | |
323 | } | |
324 | ||
58072364 | 325 | sub sync_config_from_master { |
809ae8f4 | 326 | my ($master_name, $master_ip, $noreload) = @_; |
58072364 | 327 | |
58072364 | 328 | mkdir $syncdir; |
a7c7cad7 | 329 | File::Path::remove_tree($syncdir, {keep_root => 1}); |
58072364 | 330 | |
a7c7cad7 DM |
331 | my $sa_conf_dir = "/etc/mail/spamassassin"; |
332 | my $sa_custom_cf = "custom.cf"; | |
58072364 | 333 | |
c9dae0df DM |
334 | my $cmd = $rsync_command->( |
335 | $master_name, '-lpgoq', | |
336 | "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}", | |
337 | "$syncdir/", | |
338 | '--exclude', '*~', | |
339 | '--exclude', '*.db', | |
340 | '--exclude', 'pmg-api.pem', | |
341 | '--exclude', 'pmg-tls.pem', | |
342 | ); | |
58072364 DM |
343 | |
344 | my $errmsg = "syncing master configuration from '${master_ip}' failed"; | |
345 | PVE::Tools::run_command($cmd, errmsg => $errmsg); | |
a7c7cad7 DM |
346 | |
347 | # verify that the remote host is cluster master | |
348 | open (my $fh, '<', "$syncdir/cluster.conf") || | |
349 | die "unable to open synced cluster.conf - $!\n"; | |
a7c7cad7 | 350 | |
809ae8f4 DM |
351 | my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh); |
352 | ||
353 | if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) { | |
a7c7cad7 DM |
354 | die "host '$master_ip' is not cluster master\n"; |
355 | } | |
356 | ||
809ae8f4 DM |
357 | my $role = $cinfo->{'local'}->{type} // '-'; |
358 | die "local node '$cinfo->{local}->{name}' not part of cluster\n" | |
a7c7cad7 DM |
359 | if $role eq '-'; |
360 | ||
809ae8f4 | 361 | die "local node '$cinfo->{local}->{name}' is new cluster master\n" |
a7c7cad7 DM |
362 | if $role eq 'master'; |
363 | ||
a7c7cad7 | 364 | $cond_commit_synced_file->('cluster.conf'); |
a7c7cad7 DM |
365 | |
366 | my $files = [ | |
367 | 'pmg-authkey.key', | |
368 | 'pmg-authkey.pub', | |
369 | 'pmg-csrf.key', | |
370 | 'ldap.conf', | |
371 | 'user.conf', | |
372 | ]; | |
373 | ||
374 | foreach my $filename (@$files) { | |
375 | $cond_commit_synced_file->($filename); | |
376 | } | |
377 | ||
378 | my $force_restart = {}; | |
379 | ||
380 | if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) { | |
381 | $force_restart->{spam} = 1; | |
382 | } | |
383 | ||
384 | $cond_commit_synced_file->('pmg.conf'); | |
385 | ||
386 | my $cfg = PMG::Config->new(); | |
387 | ||
388 | $cfg->rewrite_config(1, $force_restart); | |
58072364 DM |
389 | } |
390 | ||
db303db4 DM |
391 | sub sync_ruledb_from_master { |
392 | my ($ldb, $rdb, $ni, $ticket) = @_; | |
393 | ||
394 | my $ruledb = PMG::RuleDB->new($ldb); | |
395 | my $rulecache = PMG::RuleCache->new($ruledb); | |
396 | ||
397 | my $conn = PVE::APIClient::LWP->new( | |
398 | ticket => $ticket, | |
399 | cookie_name => 'PMGAuthCookie', | |
400 | host => $ni->{ip}, | |
401 | cached_fingerprints => { | |
402 | $ni->{fingerprint} => 1, | |
403 | }); | |
404 | ||
405 | my $digest = $conn->get("/config/ruledb/digest", {}); | |
406 | ||
407 | return if $digest eq $rulecache->{digest}; # no changes | |
408 | ||
409 | syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'"); | |
410 | ||
411 | eval { | |
412 | $ldb->begin_work; | |
413 | ||
414 | $ldb->do("DELETE FROM Rule"); | |
415 | $ldb->do("DELETE FROM RuleGroup"); | |
416 | $ldb->do("DELETE FROM ObjectGroup"); | |
417 | $ldb->do("DELETE FROM Object"); | |
418 | $ldb->do("DELETE FROM Attribut"); | |
419 | ||
420 | eval { | |
421 | $rdb->begin_work; | |
422 | ||
423 | # read a consistent snapshot | |
424 | $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); | |
425 | ||
426 | PMG::DBTools::copy_table($ldb, $rdb, "Rule"); | |
427 | PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup"); | |
428 | PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup"); | |
429 | PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value'); | |
430 | PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value'); | |
431 | }; | |
432 | ||
433 | $rdb->rollback; # end transaction | |
434 | ||
435 | die $@ if $@; | |
436 | ||
437 | # update sequences | |
438 | ||
439 | $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule"); | |
440 | $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object"); | |
441 | $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup"); | |
442 | ||
443 | $ldb->commit; | |
444 | }; | |
445 | if (my $err = $@) { | |
446 | $ldb->rollback; | |
447 | die $err; | |
448 | } | |
449 | ||
450 | syslog('info', "finished rule database sync from host '$ni->{ip}'"); | |
451 | } | |
452 | ||
9430b6d4 DM |
453 | sub sync_quarantine_db { |
454 | my ($ldb, $rdb, $ni, $rsynctime_ref) = @_; | |
455 | ||
456 | my $rcid = $ni->{cid}; | |
457 | ||
458 | my $maxmails = 100000; | |
459 | ||
460 | my $mscount = 0; | |
461 | ||
462 | my $ctime = PMG::DBTools::get_remote_time($rdb); | |
463 | ||
464 | my $maxcount = 1000; | |
465 | ||
466 | my $count; | |
467 | ||
468 | PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef); | |
469 | ||
470 | do { # get new values | |
471 | ||
472 | $count = 0; | |
473 | ||
474 | my $flistname = "/tmp/quarantinefilelist.$$"; | |
475 | ||
476 | eval { | |
477 | $ldb->begin_work; | |
478 | ||
479 | open(my $flistfh, '>', $flistname) || | |
480 | die "unable to open file '$flistname' - $!\n"; | |
481 | ||
482 | my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore'); | |
483 | ||
484 | # sync CMailStore | |
485 | ||
486 | my $sth = $rdb->prepare( | |
487 | "SELECT * from CMailstore WHERE cid = ? AND rid > ? " . | |
488 | "ORDER BY cid,rid LIMIT ?"); | |
489 | $sth->execute($rcid, $lastid, $maxcount); | |
490 | ||
491 | my $maxid; | |
492 | my $callback = sub { | |
493 | my $ref = shift; | |
494 | $maxid = $ref->{rid}; | |
495 | print $flistfh "$ref->{file}\n"; | |
496 | }; | |
497 | ||
498 | my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)]; | |
499 | $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback); | |
500 | ||
501 | close($flistfh); | |
502 | ||
503 | my $starttime = [ gettimeofday() ]; | |
504 | sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname); | |
505 | $$rsynctime_ref += tv_interval($starttime); | |
506 | ||
507 | if ($maxid) { | |
508 | # sync CMSReceivers | |
509 | ||
510 | $sth = $rdb->prepare( | |
511 | "SELECT * from CMSReceivers WHERE " . | |
512 | "CMailStore_CID = ? AND CMailStore_RID > ? " . | |
513 | "AND CMailStore_RID <= ?"); | |
514 | $sth->execute($rcid, $lastid, $maxid); | |
515 | ||
516 | $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)]; | |
517 | PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs); | |
518 | ||
519 | PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid); | |
520 | } | |
521 | ||
522 | $ldb->commit; | |
523 | }; | |
524 | my $err = $@; | |
525 | ||
526 | unlink $flistname; | |
527 | ||
528 | if ($err) { | |
529 | $ldb->rollback; | |
530 | die $err; | |
531 | } | |
532 | ||
533 | $mscount += $count; | |
534 | ||
535 | last if $mscount >= $maxmails; | |
536 | ||
537 | } while ($count >= $maxcount); | |
538 | ||
539 | PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef); | |
540 | ||
541 | eval { # synchronize status updates | |
542 | $ldb->begin_work; | |
543 | ||
544 | my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers'); | |
545 | ||
546 | my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'"); | |
547 | $sth->execute($lastmt); | |
548 | ||
549 | my $update_sth = $ldb->prepare( | |
550 | "UPDATE CMSReceivers SET status = ? WHERE " . | |
551 | "CMailstore_CID = ? AND CMailstore_RID = ? AND PMail = ?;"); | |
552 | while (my $ref = $sth->fetchrow_hashref()) { | |
553 | $update_sth->execute($ref->{status}, $ref->{cmailstore_cid}, | |
554 | $ref->{cmailstore_rid}, $ref->{pmail}); | |
555 | } | |
556 | ||
557 | PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime); | |
558 | ||
559 | $ldb->commit; | |
560 | }; | |
561 | if (my $err = $@) { | |
562 | $ldb->rollback; | |
563 | die $err; | |
564 | } | |
565 | ||
566 | return $mscount; | |
567 | } | |
568 | ||
569 | sub sync_statistic_db { | |
570 | my ($ldb, $rdb, $ni) = @_; | |
571 | ||
572 | my $rcid = $ni->{cid}; | |
573 | ||
574 | my $maxmails = 100000; | |
575 | ||
576 | my $mscount = 0; | |
577 | ||
578 | my $maxcount = 1000; | |
579 | ||
580 | my $count; | |
581 | ||
582 | PMG::DBTools::create_clusterinfo_default( | |
583 | $ldb, $rcid, 'lastid_CStatistic', -1, undef); | |
584 | ||
585 | do { # get new values | |
586 | ||
587 | $count = 0; | |
588 | ||
589 | eval { | |
590 | $ldb->begin_work; | |
591 | ||
592 | my $lastid = PMG::DBTools::read_int_clusterinfo( | |
593 | $ldb, $rcid, 'lastid_CStatistic'); | |
594 | ||
595 | # sync CStatistic | |
596 | ||
597 | my $sth = $rdb->prepare( | |
598 | "SELECT * from CStatistic " . | |
599 | "WHERE cid = ? AND rid > ? " . | |
600 | "ORDER BY cid, rid LIMIT ?"); | |
601 | $sth->execute($rcid, $lastid, $maxcount); | |
602 | ||
603 | my $maxid; | |
604 | my $callback = sub { | |
605 | my $ref = shift; | |
606 | $maxid = $ref->{rid}; | |
607 | }; | |
608 | ||
609 | my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)]; | |
610 | $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback); | |
611 | ||
612 | if ($maxid) { | |
613 | # sync CReceivers | |
614 | ||
615 | $sth = $rdb->prepare( | |
616 | "SELECT * from CReceivers WHERE " . | |
617 | "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?"); | |
618 | $sth->execute($rcid, $lastid, $maxid); | |
619 | ||
620 | $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)]; | |
621 | PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs); | |
622 | ||
623 | PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid); | |
624 | } | |
625 | ||
626 | $ldb->commit; | |
627 | }; | |
628 | if (my $err = $@) { | |
629 | $ldb->rollback; | |
630 | die $err; | |
631 | } | |
632 | ||
633 | $mscount += $count; | |
634 | ||
635 | last if $mscount >= $maxmails; | |
636 | ||
637 | } while ($count >= $maxcount); | |
638 | } | |
639 | ||
640 | sub sync_generic_mtime_db { | |
641 | my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_; | |
642 | ||
643 | my ($cnew, $cold) = (0, 0); | |
644 | ||
645 | my $ctime = PMG::DBTools::get_remote_time($rdb); | |
646 | ||
647 | PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef); | |
648 | ||
649 | my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table"); | |
650 | ||
4c93256a | 651 | my $sql_cmd = $selectfunc->($ctime, $lastmt); |
9430b6d4 | 652 | |
4c93256a | 653 | my $sth = $rdb->prepare($sql_cmd); |
9430b6d4 DM |
654 | |
655 | $sth->execute(); | |
656 | ||
4c93256a DM |
657 | eval { |
658 | # use transaction to speedup things | |
659 | my $max = 1000; # UPDATE MAX ENTRIES AT ONCE | |
660 | $ldb->begin_work; | |
661 | my $count = 0; | |
662 | while (my $ref = $sth->fetchrow_hashref()) { | |
663 | if (++$count >= $max) { | |
664 | $count = 0; | |
665 | $ldb->commit; | |
666 | $ldb->begin_work; | |
9430b6d4 | 667 | } |
4c93256a | 668 | $mergefunc->($ldb, $ref, \$cnew, \$cold); |
9430b6d4 DM |
669 | } |
670 | }; | |
4c93256a DM |
671 | if (my $err = $@) { |
672 | $ldb->rollback; | |
673 | die $err; | |
9430b6d4 DM |
674 | } |
675 | ||
9430b6d4 DM |
676 | PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime); |
677 | ||
678 | return ($cnew, $cold); | |
679 | } | |
680 | ||
681 | sub sync_greylist_db { | |
682 | my ($dbh, $rdb, $ni) = @_; | |
683 | ||
684 | my $selectfunc = sub { | |
685 | my ($ctime, $lastmt) = @_; | |
686 | return "SELECT * from CGreylist WHERE extime >= $ctime AND " . | |
687 | "mtime >= $lastmt AND CID != 0"; | |
688 | }; | |
689 | ||
690 | my $mergefunc = sub { | |
691 | my ($ldb, $ref, $cnewref, $coldref) = @_; | |
692 | ||
693 | my $sth = $ldb->prepare("SELECT merge_greylist(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS newcount"); | |
694 | ||
695 | $sth->execute($ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver}, | |
696 | $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay}, $ref->{blocked}, | |
697 | $ref->{passed}, 0, $ref->{cid}); | |
698 | ||
699 | my $res = $sth->fetchrow_hashref(); | |
700 | ||
701 | $sth->finish(); | |
702 | ||
703 | if ($res->{newcount}) { | |
704 | $$cnewref++; | |
705 | } else { | |
706 | $$coldref++; | |
707 | } | |
708 | }; | |
709 | ||
710 | return sync_generic_mtime_db($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc); | |
711 | } | |
712 | ||
713 | sub sync_userprefs_db { | |
714 | my ($dbh, $rdb, $ni) = @_; | |
715 | ||
716 | my $selectfunc = sub { | |
717 | my ($ctime, $lastmt) = @_; | |
718 | ||
719 | return "SELECT * from UserPrefs WHERE mtime >= $lastmt"; | |
720 | }; | |
721 | ||
471b05ed DM |
722 | my $update_sth = $dbh->prepare( |
723 | "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " . | |
724 | 'VALUES ($1, $2, $3, 0) ' . | |
725 | 'ON CONFLICT (PMail, Name) DO UPDATE SET ' . | |
726 | 'MTime = 0, ' . # this is just a copy from somewhere else, not modified | |
727 | 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE $3 END'); | |
728 | ||
9430b6d4 DM |
729 | my $mergefunc = sub { |
730 | my ($ldb, $ref, $cnewref, $coldref) = @_; | |
731 | ||
471b05ed DM |
732 | $update_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}); |
733 | $$coldref++; | |
9430b6d4 DM |
734 | }; |
735 | ||
736 | return sync_generic_mtime_db($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc); | |
737 | } | |
738 | ||
739 | sub sync_domainstat_db { | |
740 | my ($dbh, $rdb, $ni) = @_; | |
741 | ||
742 | my $selectfunc = sub { | |
743 | my ($ctime, $lastmt) = @_; | |
744 | return "SELECT * from DomainStat WHERE mtime >= $lastmt"; | |
745 | }; | |
746 | ||
747 | my $mergefunc = sub { | |
748 | my ($ldb, $ref, $cnewref, $coldref) = @_; | |
749 | ||
750 | my $sth = $ldb->prepare ( | |
751 | "UPDATE Domainstat " . | |
752 | "SET CountIn = $ref->{countin}, CountOut = $ref->{countout}, " . | |
753 | "BytesIn = $ref->{bytesin}, BytesOut = $ref->{bytesout}, " . | |
754 | "VirusIn = $ref->{virusin}, VirusOut = $ref->{virusout}, " . | |
755 | "SpamIn = $ref->{spamin}, SpamOut = $ref->{spamout}, " . | |
756 | "BouncesIn = $ref->{bouncesin}, BouncesOut = $ref->{bouncesout}, " . | |
757 | "PTimeSum = $ref->{ptimesum}, MTime = $ref->{mtime} " . | |
758 | "WHERE Time = ? AND Domain = ?"); | |
759 | ||
760 | $sth->execute($ref->{time}, $ref->{domain}); | |
761 | ||
762 | my $rows = $sth->rows; | |
763 | ||
764 | $sth->finish(); | |
765 | ||
766 | if ($rows) { | |
767 | $$coldref++; | |
768 | } else { | |
769 | $ldb->do( | |
770 | "INSERT INTO Domainstat " . | |
771 | "(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut," . | |
772 | "BouncesIn,BouncesOut,PTimeSum,Mtime) " . | |
773 | "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", undef, | |
774 | $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout}, | |
775 | $ref->{bytesin}, $ref->{bytesout}, | |
776 | $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout}, | |
777 | $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime}); | |
778 | $$cnewref++; | |
779 | } | |
780 | }; | |
781 | ||
782 | return sync_generic_mtime_db($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc); | |
783 | } | |
784 | ||
785 | sub sync_dailystat_db { | |
786 | my ($dbh, $rdb, $ni) = @_; | |
787 | ||
788 | my $selectfunc = sub { | |
789 | my ($ctime, $lastmt) = @_; | |
790 | return "SELECT * from DailyStat WHERE mtime >= $lastmt"; | |
791 | }; | |
792 | ||
793 | my $mergefunc = sub { | |
794 | my ($ldb, $ref, $cnewref, $coldref) = @_; | |
795 | ||
796 | my $sth = $ldb->prepare( | |
797 | "UPDATE DailyStat " . | |
798 | "SET CountIn = $ref->{countin}, CountOut = $ref->{countout}, " . | |
799 | "BytesIn = $ref->{bytesin}, BytesOut = $ref->{bytesout}, " . | |
800 | "VirusIn = $ref->{virusin}, VirusOut = $ref->{virusout}, " . | |
801 | "SpamIn = $ref->{spamin}, SpamOut = $ref->{spamout}, " . | |
802 | "BouncesIn = $ref->{bouncesin}, BouncesOut = $ref->{bouncesout}, " . | |
803 | "GreylistCount = $ref->{greylistcount}, SPFCount = $ref->{spfcount}, " . | |
804 | "RBLCount = $ref->{rblcount}, " . | |
805 | "PTimeSum = $ref->{ptimesum}, MTime = $ref->{mtime} " . | |
806 | "WHERE Time = ?"); | |
807 | ||
808 | $sth->execute($ref->{time}); | |
809 | ||
810 | my $rows = $sth->rows; | |
811 | ||
812 | $sth->finish(); | |
813 | ||
814 | if ($rows) { | |
815 | $$coldref++; | |
816 | } else { | |
817 | $ldb->do ( | |
818 | "INSERT INTO DailyStat " . | |
819 | "(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut," . | |
820 | "BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) " . | |
821 | "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", undef, | |
822 | $ref->{time}, $ref->{countin}, $ref->{countout}, | |
823 | $ref->{bytesin}, $ref->{bytesout}, | |
824 | $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout}, | |
825 | $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount}, | |
826 | $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime}); | |
827 | $$cnewref++; | |
828 | } | |
829 | }; | |
830 | ||
831 | return sync_generic_mtime_db($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc); | |
832 | } | |
833 | ||
834 | sub sync_virusinfo_db { | |
835 | my ($dbh, $rdb, $ni) = @_; | |
836 | ||
837 | my $selectfunc = sub { | |
838 | my ($ctime, $lastmt) = @_; | |
839 | return "SELECT * from VirusInfo WHERE mtime >= $lastmt"; | |
840 | }; | |
841 | ||
842 | my $mergefunc = sub { | |
843 | my ($ldb, $ref, $cnewref, $coldref) = @_; | |
844 | ||
845 | my $sth = $ldb->prepare( | |
846 | "UPDATE VirusInfo SET Count = ? , MTime = ? " . | |
847 | "WHERE Time = ? AND Name = ?"); | |
848 | ||
849 | $sth->execute($ref->{count}, $ref->{mtime}, $ref->{time}, $ref->{name}); | |
850 | ||
851 | my $rows = $sth->rows; | |
852 | ||
853 | $sth->finish (); | |
854 | ||
855 | if ($rows) { | |
856 | $$coldref++; | |
857 | } else { | |
858 | $ldb->do ("INSERT INTO VirusInfo (Time,Name,Count,MTime) " . | |
859 | "VALUES (?,?,?,?)", undef, | |
860 | $ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime}); | |
861 | $$cnewref++; | |
862 | } | |
863 | }; | |
864 | ||
865 | return sync_generic_mtime_db($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc); | |
866 | } | |
867 | ||
868 | sub sync_deleted_nodes_from_master { | |
869 | my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_; | |
870 | ||
871 | my $rsynctime = 0; | |
872 | ||
873 | my $cid_hash = {}; # fast lookup | |
5c6e6eeb | 874 | foreach my $ni (values %{$cinfo->{ids}}) { |
9430b6d4 DM |
875 | $cid_hash->{$ni->{cid}} = $ni; |
876 | } | |
877 | ||
878 | my $spooldir = $PMG::MailQueue::spooldir; | |
879 | ||
5c6e6eeb DM |
880 | my $maxcid = $cinfo->{master}->{maxcid} // 0; |
881 | ||
882 | for (my $rcid = 1; $rcid <= $maxcid; $rcid++) { | |
9430b6d4 DM |
883 | next if $cid_hash->{$rcid}; |
884 | ||
885 | my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node"; | |
886 | ||
887 | next if -f $done_marker; # already synced | |
888 | ||
889 | syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'"); | |
890 | ||
891 | my $starttime = [ gettimeofday() ]; | |
892 | sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid); | |
893 | $$rsynctime_ref += tv_interval($starttime); | |
894 | ||
895 | my $fake_ni = { | |
896 | ip => $masterni->{ip}, | |
897 | name => $masterni->{name}, | |
898 | cid => $rcid, | |
899 | }; | |
900 | ||
901 | sync_quarantine_db($ldb, $masterdb, $fake_ni); | |
902 | ||
903 | sync_statistic_db ($ldb, $masterdb, $fake_ni); | |
904 | ||
905 | open(my $fh, ">>", $done_marker); | |
906 | } | |
907 | } | |
908 | ||
909 | ||
0854fb22 | 910 | 1; |