]>
Commit | Line | Data |
---|---|---|
0854fb22 DM |
1 | package PMG::Cluster; |
2 | ||
3 | use strict; | |
4 | use warnings; | |
45e68618 | 5 | use Data::Dumper; |
0854fb22 | 6 | use Socket; |
cfdf6608 | 7 | use File::Path; |
9430b6d4 | 8 | use Time::HiRes qw (gettimeofday tv_interval); |
45e68618 | 9 | |
a7c7cad7 | 10 | use PVE::SafeSyslog; |
0854fb22 DM |
11 | use PVE::Tools; |
12 | use PVE::INotify; | |
0757859a | 13 | use PVE::APIClient::LWP; |
0854fb22 | 14 | |
1fb2ab76 | 15 | use PMG::Utils; |
a7c7cad7 | 16 | use PMG::Config; |
9f67f5b3 | 17 | use PMG::ClusterConfig; |
db303db4 DM |
18 | use PMG::RuleDB; |
19 | use PMG::RuleCache; | |
9430b6d4 | 20 | use PMG::MailQueue; |
0757859a | 21 | use PMG::Fetchmail; |
0854fb22 | 22 | |
8737f93a DM |
23 | sub remote_node_ip { |
24 | my ($nodename, $noerr) = @_; | |
25 | ||
d8782874 | 26 | my $cinfo = PMG::ClusterConfig->new(); |
8737f93a | 27 | |
45e68618 | 28 | foreach my $entry (values %{$cinfo->{ids}}) { |
8737f93a DM |
29 | if ($entry->{name} eq $nodename) { |
30 | my $ip = $entry->{ip}; | |
31 | return $ip if !wantarray; | |
32 | my $family = PVE::Tools::get_host_address_family($ip); | |
33 | return ($ip, $family); | |
34 | } | |
35 | } | |
36 | ||
37 | # fallback: try to get IP by other means | |
9f67f5b3 | 38 | return PMG::Utils::lookup_node_ip($nodename, $noerr); |
8737f93a DM |
39 | } |
40 | ||
d2e43f9e DM |
41 | sub get_master_node { |
42 | my ($cinfo) = @_; | |
43 | ||
d8782874 | 44 | $cinfo = PMG::ClusterConfig->new() if !$cinfo; |
d2e43f9e DM |
45 | |
46 | return $cinfo->{master}->{name} if defined($cinfo->{master}); | |
47 | ||
48 | return 'localhost'; | |
49 | } | |
50 | ||
cba17aeb DM |
51 | sub read_local_ssl_cert_fingerprint { |
52 | my $cert_path = "/etc/pmg/pmg-api.pem"; | |
0854fb22 | 53 | |
cba17aeb DM |
54 | my $cert; |
55 | eval { | |
56 | my $bio = Net::SSLeay::BIO_new_file($cert_path, 'r'); | |
57 | $cert = Net::SSLeay::PEM_read_bio_X509($bio); | |
58 | Net::SSLeay::BIO_free($bio); | |
59 | }; | |
60 | if (my $err = $@) { | |
61 | die "unable to read certificate '$cert_path' - $err\n"; | |
62 | } | |
0854fb22 | 63 | |
cba17aeb DM |
64 | if (!defined($cert)) { |
65 | die "unable to read certificate '$cert_path' - got empty value\n"; | |
66 | } | |
67 | ||
68 | my $fp; | |
69 | eval { | |
70 | $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256'); | |
71 | }; | |
72 | if (my $err = $@) { | |
73 | die "unable to get fingerprint for '$cert_path' - $err\n"; | |
74 | } | |
0854fb22 | 75 | |
cba17aeb DM |
76 | if (!defined($fp) || $fp eq '') { |
77 | die "unable to get fingerprint for '$cert_path' - got empty value\n"; | |
78 | } | |
0854fb22 | 79 | |
cba17aeb DM |
80 | return $fp; |
81 | } | |
0854fb22 | 82 | |
cba17aeb DM |
83 | my $hostrsapubkey_fn = '/etc/ssh/ssh_host_rsa_key.pub'; |
84 | my $rootrsakey_fn = '/root/.ssh/id_rsa'; | |
85 | my $rootrsapubkey_fn = '/root/.ssh/id_rsa.pub'; | |
0854fb22 | 86 | |
cba17aeb DM |
87 | sub read_local_cluster_info { |
88 | ||
89 | my $res = {}; | |
90 | ||
91 | my $hostrsapubkey = PVE::Tools::file_read_firstline($hostrsapubkey_fn); | |
92 | $hostrsapubkey =~ s/^.*ssh-rsa\s+//i; | |
93 | $hostrsapubkey =~ s/\s+root\@\S+\s*$//i; | |
94 | ||
95 | die "unable to parse ${hostrsapubkey_fn}\n" | |
96 | if $hostrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/; | |
0854fb22 DM |
97 | |
98 | my $nodename = PVE::INotify::nodename(); | |
99 | ||
cba17aeb | 100 | $res->{name} = $nodename; |
0854fb22 | 101 | |
cba17aeb | 102 | $res->{ip} = PMG::Utils::lookup_node_ip($nodename); |
0854fb22 | 103 | |
cba17aeb | 104 | $res->{hostrsapubkey} = $hostrsapubkey; |
0854fb22 | 105 | |
cba17aeb DM |
106 | if (! -f $rootrsapubkey_fn) { |
107 | unlink $rootrsakey_fn; | |
108 | my $cmd = ['ssh-keygen', '-t', 'rsa', '-N', '', '-b', '2048', | |
109 | '-f', $rootrsakey_fn]; | |
1fb2ab76 | 110 | PMG::Utils::run_silent_cmd($cmd); |
cba17aeb DM |
111 | } |
112 | ||
113 | my $rootrsapubkey = PVE::Tools::file_read_firstline($rootrsapubkey_fn); | |
114 | $rootrsapubkey =~ s/^.*ssh-rsa\s+//i; | |
115 | $rootrsapubkey =~ s/\s+root\@\S+\s*$//i; | |
116 | ||
117 | die "unable to parse ${rootrsapubkey_fn}\n" | |
118 | if $rootrsapubkey !~ m/^[A-Za-z0-9\.\/\+]{200,}$/; | |
119 | ||
120 | $res->{rootrsapubkey} = $rootrsapubkey; | |
121 | ||
122 | $res->{fingerprint} = read_local_ssl_cert_fingerprint(); | |
123 | ||
124 | return $res; | |
125 | } | |
126 | ||
127 | # X509 Certificate cache helper | |
128 | ||
129 | my $cert_cache_nodes = {}; | |
130 | my $cert_cache_timestamp = time(); | |
131 | my $cert_cache_fingerprints = {}; | |
0854fb22 | 132 | |
cba17aeb | 133 | sub update_cert_cache { |
0854fb22 | 134 | |
cba17aeb DM |
135 | $cert_cache_timestamp = time(); |
136 | ||
137 | $cert_cache_fingerprints = {}; | |
138 | $cert_cache_nodes = {}; | |
139 | ||
d8782874 | 140 | my $cinfo = PMG::ClusterConfig->new(); |
cba17aeb DM |
141 | |
142 | foreach my $entry (values %{$cinfo->{ids}}) { | |
143 | my $node = $entry->{name}; | |
144 | my $fp = $entry->{fingerprint}; | |
145 | if ($node && $fp) { | |
146 | $cert_cache_fingerprints->{$fp} = 1; | |
147 | $cert_cache_nodes->{$node} = $fp; | |
0854fb22 DM |
148 | } |
149 | } | |
150 | } | |
151 | ||
152 | # load and cache cert fingerprint once | |
153 | sub initialize_cert_cache { | |
154 | my ($node) = @_; | |
155 | ||
cba17aeb | 156 | update_cert_cache() |
0854fb22 DM |
157 | if defined($node) && !defined($cert_cache_nodes->{$node}); |
158 | } | |
159 | ||
160 | sub check_cert_fingerprint { | |
161 | my ($cert) = @_; | |
162 | ||
163 | # clear cache every 30 minutes at least | |
cba17aeb | 164 | update_cert_cache() if time() - $cert_cache_timestamp >= 60*30; |
0854fb22 DM |
165 | |
166 | # get fingerprint of server certificate | |
167 | my $fp; | |
168 | eval { | |
169 | $fp = Net::SSLeay::X509_get_fingerprint($cert, 'sha256'); | |
170 | }; | |
171 | return 0 if $@ || !defined($fp) || $fp eq ''; # error | |
172 | ||
173 | my $check = sub { | |
174 | for my $expected (keys %$cert_cache_fingerprints) { | |
175 | return 1 if $fp eq $expected; | |
176 | } | |
177 | return 0; | |
178 | }; | |
179 | ||
cba17aeb | 180 | return 1 if $check->(); |
0854fb22 DM |
181 | |
182 | # clear cache and retry at most once every minute | |
183 | if (time() - $cert_cache_timestamp >= 60) { | |
184 | syslog ('info', "Could not verify remote node certificate '$fp' with list of pinned certificates, refreshing cache"); | |
185 | update_cert_cache(); | |
cba17aeb | 186 | return $check->(); |
0854fb22 DM |
187 | } |
188 | ||
189 | return 0; | |
190 | } | |
191 | ||
58072364 DM |
192 | my $sshglobalknownhosts = "/etc/ssh/ssh_known_hosts2"; |
193 | my $rootsshauthkeys = "/root/.ssh/authorized_keys"; | |
194 | my $ssh_rsa_id = "/root/.ssh/id_rsa.pub"; | |
195 | ||
196 | sub update_ssh_keys { | |
197 | my ($cinfo) = @_; | |
198 | ||
809dd92a | 199 | my $old = ''; |
58072364 | 200 | my $data = ''; |
809dd92a | 201 | |
58072364 DM |
202 | foreach my $node (values %{$cinfo->{ids}}) { |
203 | $data .= "$node->{ip} ssh-rsa $node->{hostrsapubkey}\n"; | |
204 | $data .= "$node->{name} ssh-rsa $node->{hostrsapubkey}\n"; | |
205 | } | |
206 | ||
809dd92a DM |
207 | $old = PVE::Tools::file_get_contents($sshglobalknownhosts, 1024*1024) |
208 | if -f $sshglobalknownhosts; | |
209 | ||
210 | PVE::Tools::file_set_contents($sshglobalknownhosts, $data) | |
211 | if $old ne $data; | |
58072364 DM |
212 | |
213 | $data = ''; | |
809dd92a | 214 | $old = ''; |
58072364 DM |
215 | |
216 | # always add ourself | |
217 | if (-f $ssh_rsa_id) { | |
218 | my $pub = PVE::Tools::file_get_contents($ssh_rsa_id); | |
219 | chomp($pub); | |
220 | $data .= "$pub\n"; | |
221 | } | |
222 | ||
223 | foreach my $node (values %{$cinfo->{ids}}) { | |
224 | $data .= "ssh-rsa $node->{rootrsapubkey} root\@$node->{name}\n"; | |
225 | } | |
226 | ||
227 | if (-f $rootsshauthkeys) { | |
a7c7cad7 DM |
228 | my $mykey = PVE::Tools::file_get_contents($rootsshauthkeys, 128*1024); |
229 | chomp($mykey); | |
230 | $data .= "$mykey\n"; | |
58072364 DM |
231 | } |
232 | ||
233 | my $newdata = ""; | |
234 | my $vhash = {}; | |
235 | my @lines = split(/\n/, $data); | |
236 | foreach my $line (@lines) { | |
237 | if ($line !~ /^#/ && $line =~ m/(^|\s)ssh-(rsa|dsa)\s+(\S+)\s+\S+$/) { | |
238 | next if $vhash->{$3}++; | |
239 | } | |
240 | $newdata .= "$line\n"; | |
241 | } | |
242 | ||
809dd92a DM |
243 | $old = PVE::Tools::file_get_contents($rootsshauthkeys, 1024*1024) |
244 | if -f $rootsshauthkeys; | |
245 | ||
246 | PVE::Tools::file_set_contents($rootsshauthkeys, $newdata, 0600) | |
247 | if $old ne $newdata; | |
58072364 DM |
248 | } |
249 | ||
a7c7cad7 DM |
250 | my $cfgdir = '/etc/pmg'; |
251 | my $syncdir = "$cfgdir/master"; | |
252 | ||
253 | my $cond_commit_synced_file = sub { | |
254 | my ($filename, $dstfn) = @_; | |
255 | ||
256 | $dstfn = "$cfgdir/$filename" if !defined($dstfn); | |
257 | my $srcfn = "$syncdir/$filename"; | |
258 | ||
259 | if (! -f $srcfn) { | |
260 | unlink $dstfn; | |
261 | return; | |
262 | } | |
263 | ||
264 | my $new = PVE::Tools::file_get_contents($srcfn, 1024*1024); | |
265 | ||
266 | if (-f $dstfn) { | |
267 | my $old = PVE::Tools::file_get_contents($dstfn, 1024*1024); | |
268 | return 0 if $new eq $old; | |
269 | } | |
270 | ||
1dc946d4 DM |
271 | # set mtime (touch) to avoid time drift problems |
272 | utime(undef, undef, $srcfn); | |
273 | ||
a7c7cad7 DM |
274 | rename($srcfn, $dstfn) || |
275 | die "cond_rename_file '$filename' failed - $!\n"; | |
276 | ||
277 | print STDERR "updated $dstfn\n"; | |
278 | ||
279 | return 1; | |
280 | }; | |
281 | ||
c9dae0df DM |
282 | my $rsync_command = sub { |
283 | my ($host_key_alias, @args) = @_; | |
284 | ||
285 | my $ssh_cmd = '--rsh=ssh -l root -o BatchMode=yes'; | |
286 | $ssh_cmd .= " -o HostKeyAlias=${host_key_alias}" if $host_key_alias; | |
287 | ||
288 | my $cmd = ['rsync', $ssh_cmd, '-q', @args]; | |
289 | ||
290 | return $cmd; | |
291 | }; | |
292 | ||
293 | sub sync_quarantine_files { | |
294 | my ($host_ip, $host_name, $flistname) = @_; | |
295 | ||
9430b6d4 DM |
296 | my $spooldir = $PMG::MailQueue::spooldir; |
297 | ||
c9dae0df DM |
298 | my $cmd = $rsync_command->( |
299 | $host_name, '--timeout', '10', "${host_ip}:$spooldir", $spooldir, | |
300 | '--files-from', $flistname); | |
301 | ||
9430b6d4 | 302 | PVE::Tools::run_command($cmd); |
c9dae0df DM |
303 | } |
304 | ||
305 | sub sync_spooldir { | |
306 | my ($host_ip, $host_name, $rcid) = @_; | |
307 | ||
9430b6d4 DM |
308 | my $spooldir = $PMG::MailQueue::spooldir; |
309 | ||
c9dae0df DM |
310 | mkdir "$spooldir/cluster/"; |
311 | my $syncdir = "$spooldir/cluster/$rcid"; | |
312 | mkdir $syncdir; | |
313 | ||
314 | my $cmd = $rsync_command->( | |
315 | $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir/", $syncdir); | |
316 | ||
317 | foreach my $incl (('spam/', 'spam/*', 'spam/*/*', 'virus/', 'virus/*', 'virus/*/*')) { | |
318 | push @$cmd, '--include', $incl; | |
319 | } | |
320 | ||
321 | push @$cmd, '--exclude', '*'; | |
322 | ||
323 | PVE::Tools::run_command($cmd); | |
324 | } | |
325 | ||
326 | sub sync_master_quar { | |
327 | my ($host_ip, $host_name) = @_; | |
328 | ||
9430b6d4 DM |
329 | my $spooldir = $PMG::MailQueue::spooldir; |
330 | ||
c9dae0df DM |
331 | my $syncdir = "$spooldir/cluster/"; |
332 | mkdir $syncdir; | |
333 | ||
334 | my $cmd = $rsync_command->( | |
335 | $host_name, '-aq', '--timeout', '10', "${host_ip}:$syncdir", $syncdir); | |
336 | ||
337 | PVE::Tools::run_command($cmd); | |
338 | } | |
339 | ||
58072364 | 340 | sub sync_config_from_master { |
809ae8f4 | 341 | my ($master_name, $master_ip, $noreload) = @_; |
58072364 | 342 | |
58072364 | 343 | mkdir $syncdir; |
a7c7cad7 | 344 | File::Path::remove_tree($syncdir, {keep_root => 1}); |
58072364 | 345 | |
a7c7cad7 DM |
346 | my $sa_conf_dir = "/etc/mail/spamassassin"; |
347 | my $sa_custom_cf = "custom.cf"; | |
58072364 | 348 | |
c9dae0df | 349 | my $cmd = $rsync_command->( |
f4b7112b | 350 | $master_name, '-aq', |
c9dae0df DM |
351 | "${master_ip}:$cfgdir/* ${sa_conf_dir}/${sa_custom_cf}", |
352 | "$syncdir/", | |
f4b7112b | 353 | '--exclude', 'master/', |
c9dae0df DM |
354 | '--exclude', '*~', |
355 | '--exclude', '*.db', | |
356 | '--exclude', 'pmg-api.pem', | |
357 | '--exclude', 'pmg-tls.pem', | |
358 | ); | |
58072364 DM |
359 | |
360 | my $errmsg = "syncing master configuration from '${master_ip}' failed"; | |
361 | PVE::Tools::run_command($cmd, errmsg => $errmsg); | |
a7c7cad7 DM |
362 | |
363 | # verify that the remote host is cluster master | |
364 | open (my $fh, '<', "$syncdir/cluster.conf") || | |
365 | die "unable to open synced cluster.conf - $!\n"; | |
a7c7cad7 | 366 | |
809ae8f4 DM |
367 | my $cinfo = PMG::ClusterConfig::read_cluster_conf('cluster.conf', $fh); |
368 | ||
369 | if (!$cinfo->{master} || ($cinfo->{master}->{ip} ne $master_ip)) { | |
a7c7cad7 DM |
370 | die "host '$master_ip' is not cluster master\n"; |
371 | } | |
372 | ||
809ae8f4 DM |
373 | my $role = $cinfo->{'local'}->{type} // '-'; |
374 | die "local node '$cinfo->{local}->{name}' not part of cluster\n" | |
a7c7cad7 DM |
375 | if $role eq '-'; |
376 | ||
809ae8f4 | 377 | die "local node '$cinfo->{local}->{name}' is new cluster master\n" |
a7c7cad7 DM |
378 | if $role eq 'master'; |
379 | ||
a7c7cad7 | 380 | $cond_commit_synced_file->('cluster.conf'); |
a7c7cad7 | 381 | |
809dd92a DM |
382 | update_ssh_keys($cinfo); # rewrite ssh keys |
383 | ||
0757859a DM |
384 | PMG::Fetchmail::update_fetchmail_default(0); # disable on slave |
385 | ||
a7c7cad7 DM |
386 | my $files = [ |
387 | 'pmg-authkey.key', | |
388 | 'pmg-authkey.pub', | |
389 | 'pmg-csrf.key', | |
390 | 'ldap.conf', | |
391 | 'user.conf', | |
7cac3e28 DM |
392 | 'domains', |
393 | 'mynetworks', | |
394 | 'transport', | |
959aaeba | 395 | 'tls_policy', |
fd6feef4 | 396 | 'fetchmailrc', |
a7c7cad7 DM |
397 | ]; |
398 | ||
399 | foreach my $filename (@$files) { | |
400 | $cond_commit_synced_file->($filename); | |
401 | } | |
402 | ||
0757859a | 403 | |
a7c7cad7 DM |
404 | my $force_restart = {}; |
405 | ||
406 | if ($cond_commit_synced_file->($sa_custom_cf, "${sa_conf_dir}/${sa_custom_cf}")) { | |
407 | $force_restart->{spam} = 1; | |
408 | } | |
409 | ||
410 | $cond_commit_synced_file->('pmg.conf'); | |
f4b7112b DM |
411 | |
412 | # sync user templates files/symlinks (not recursive) | |
413 | my $srcdir = "$syncdir/templates"; | |
414 | if (-d $srcdir) { | |
415 | my $dstdir = "$cfgdir/templates"; | |
416 | mkdir $dstdir; | |
417 | my $names_hash = {}; | |
418 | foreach my $fn (<$srcdir/*>) { | |
419 | next if $fn !~ m|^($srcdir/(.*))$|; | |
420 | $fn = $1; # untaint; | |
421 | my $name = $2; | |
422 | $names_hash->{$name} = 1; | |
423 | my $target = "$dstdir/$name"; | |
424 | if (-f $fn) { | |
425 | $cond_commit_synced_file->("templates/$name", $target); | |
426 | } elsif (-l $fn) { | |
427 | warn "update $target failed - $!\n" if !rename($fn, $target); | |
428 | } | |
429 | } | |
430 | # remove vanished files | |
431 | foreach my $fn (<$dstdir/*>) { | |
432 | next if $fn !~ m|^($dstdir/(.*))$|; | |
433 | $fn = $1; # untaint; | |
434 | my $name = $2; | |
435 | next if $names_hash->{$name}; | |
436 | warn "unlink $fn failed - $!\n" if !unlink($fn); | |
437 | } | |
438 | } | |
58072364 DM |
439 | } |
440 | ||
db303db4 DM |
441 | sub sync_ruledb_from_master { |
442 | my ($ldb, $rdb, $ni, $ticket) = @_; | |
443 | ||
444 | my $ruledb = PMG::RuleDB->new($ldb); | |
445 | my $rulecache = PMG::RuleCache->new($ruledb); | |
446 | ||
447 | my $conn = PVE::APIClient::LWP->new( | |
448 | ticket => $ticket, | |
449 | cookie_name => 'PMGAuthCookie', | |
450 | host => $ni->{ip}, | |
451 | cached_fingerprints => { | |
452 | $ni->{fingerprint} => 1, | |
453 | }); | |
454 | ||
455 | my $digest = $conn->get("/config/ruledb/digest", {}); | |
456 | ||
457 | return if $digest eq $rulecache->{digest}; # no changes | |
458 | ||
459 | syslog('info', "detected rule database changes - starting sync from '$ni->{ip}'"); | |
460 | ||
461 | eval { | |
462 | $ldb->begin_work; | |
463 | ||
464 | $ldb->do("DELETE FROM Rule"); | |
465 | $ldb->do("DELETE FROM RuleGroup"); | |
466 | $ldb->do("DELETE FROM ObjectGroup"); | |
467 | $ldb->do("DELETE FROM Object"); | |
468 | $ldb->do("DELETE FROM Attribut"); | |
469 | ||
470 | eval { | |
471 | $rdb->begin_work; | |
472 | ||
473 | # read a consistent snapshot | |
474 | $rdb->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); | |
475 | ||
476 | PMG::DBTools::copy_table($ldb, $rdb, "Rule"); | |
477 | PMG::DBTools::copy_table($ldb, $rdb, "RuleGroup"); | |
478 | PMG::DBTools::copy_table($ldb, $rdb, "ObjectGroup"); | |
479 | PMG::DBTools::copy_table($ldb, $rdb, "Object", 'value'); | |
480 | PMG::DBTools::copy_table($ldb, $rdb, "Attribut", 'value'); | |
481 | }; | |
482 | ||
483 | $rdb->rollback; # end transaction | |
484 | ||
485 | die $@ if $@; | |
486 | ||
487 | # update sequences | |
488 | ||
489 | $ldb->do("SELECT setval('rule_id_seq', max(id)+1) FROM Rule"); | |
490 | $ldb->do("SELECT setval('object_id_seq', max(id)+1) FROM Object"); | |
491 | $ldb->do("SELECT setval('objectgroup_id_seq', max(id)+1) FROM ObjectGroup"); | |
492 | ||
493 | $ldb->commit; | |
494 | }; | |
495 | if (my $err = $@) { | |
496 | $ldb->rollback; | |
497 | die $err; | |
498 | } | |
499 | ||
500 | syslog('info', "finished rule database sync from host '$ni->{ip}'"); | |
501 | } | |
502 | ||
9430b6d4 DM |
503 | sub sync_quarantine_db { |
504 | my ($ldb, $rdb, $ni, $rsynctime_ref) = @_; | |
505 | ||
506 | my $rcid = $ni->{cid}; | |
507 | ||
508 | my $maxmails = 100000; | |
509 | ||
510 | my $mscount = 0; | |
511 | ||
512 | my $ctime = PMG::DBTools::get_remote_time($rdb); | |
513 | ||
514 | my $maxcount = 1000; | |
515 | ||
516 | my $count; | |
517 | ||
518 | PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastid_CMailStore', -1, undef); | |
519 | ||
520 | do { # get new values | |
521 | ||
522 | $count = 0; | |
523 | ||
524 | my $flistname = "/tmp/quarantinefilelist.$$"; | |
525 | ||
526 | eval { | |
527 | $ldb->begin_work; | |
528 | ||
529 | open(my $flistfh, '>', $flistname) || | |
530 | die "unable to open file '$flistname' - $!\n"; | |
531 | ||
532 | my $lastid = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastid_CMailStore'); | |
533 | ||
534 | # sync CMailStore | |
535 | ||
536 | my $sth = $rdb->prepare( | |
537 | "SELECT * from CMailstore WHERE cid = ? AND rid > ? " . | |
538 | "ORDER BY cid,rid LIMIT ?"); | |
539 | $sth->execute($rcid, $lastid, $maxcount); | |
540 | ||
541 | my $maxid; | |
542 | my $callback = sub { | |
543 | my $ref = shift; | |
544 | $maxid = $ref->{rid}; | |
545 | print $flistfh "$ref->{file}\n"; | |
546 | }; | |
547 | ||
548 | my $attrs = [qw(cid rid time qtype bytes spamlevel info sender header file)]; | |
549 | $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CMailStore', $attrs, $callback); | |
550 | ||
551 | close($flistfh); | |
552 | ||
553 | my $starttime = [ gettimeofday() ]; | |
554 | sync_quarantine_files($ni->{ip}, $ni->{name}, $flistname); | |
555 | $$rsynctime_ref += tv_interval($starttime); | |
556 | ||
557 | if ($maxid) { | |
558 | # sync CMSReceivers | |
559 | ||
560 | $sth = $rdb->prepare( | |
561 | "SELECT * from CMSReceivers WHERE " . | |
562 | "CMailStore_CID = ? AND CMailStore_RID > ? " . | |
563 | "AND CMailStore_RID <= ?"); | |
564 | $sth->execute($rcid, $lastid, $maxid); | |
565 | ||
666b5e8f | 566 | $attrs = [qw(cmailstore_cid cmailstore_rid pmail receiver ticketid status mtime)]; |
9430b6d4 DM |
567 | PMG::DBTools::copy_selected_data($ldb, $sth, 'CMSReceivers', $attrs); |
568 | ||
569 | PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastid_CMailStore', $maxid); | |
570 | } | |
571 | ||
572 | $ldb->commit; | |
573 | }; | |
574 | my $err = $@; | |
575 | ||
576 | unlink $flistname; | |
577 | ||
578 | if ($err) { | |
579 | $ldb->rollback; | |
580 | die $err; | |
581 | } | |
582 | ||
583 | $mscount += $count; | |
584 | ||
585 | last if $mscount >= $maxmails; | |
586 | ||
587 | } while ($count >= $maxcount); | |
588 | ||
589 | PMG::DBTools::create_clusterinfo_default($ldb, $rcid, 'lastmt_CMSReceivers', 0, undef); | |
590 | ||
591 | eval { # synchronize status updates | |
592 | $ldb->begin_work; | |
593 | ||
594 | my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers'); | |
595 | ||
596 | my $sth = $rdb->prepare ("SELECT * from CMSReceivers WHERE mtime >= ? AND status != 'N'"); | |
597 | $sth->execute($lastmt); | |
598 | ||
599 | my $update_sth = $ldb->prepare( | |
600 | "UPDATE CMSReceivers SET status = ? WHERE " . | |
666b5e8f | 601 | "CMailstore_CID = ? AND CMailstore_RID = ? AND TicketID = ?"); |
9430b6d4 DM |
602 | while (my $ref = $sth->fetchrow_hashref()) { |
603 | $update_sth->execute($ref->{status}, $ref->{cmailstore_cid}, | |
666b5e8f | 604 | $ref->{cmailstore_rid}, $ref->{ticketid}); |
9430b6d4 DM |
605 | } |
606 | ||
607 | PMG::DBTools::write_maxint_clusterinfo($ldb, $rcid, 'lastmt_CMSReceivers', $ctime); | |
608 | ||
609 | $ldb->commit; | |
610 | }; | |
611 | if (my $err = $@) { | |
612 | $ldb->rollback; | |
613 | die $err; | |
614 | } | |
615 | ||
616 | return $mscount; | |
617 | } | |
618 | ||
619 | sub sync_statistic_db { | |
620 | my ($ldb, $rdb, $ni) = @_; | |
621 | ||
622 | my $rcid = $ni->{cid}; | |
623 | ||
624 | my $maxmails = 100000; | |
625 | ||
626 | my $mscount = 0; | |
627 | ||
628 | my $maxcount = 1000; | |
629 | ||
630 | my $count; | |
631 | ||
632 | PMG::DBTools::create_clusterinfo_default( | |
633 | $ldb, $rcid, 'lastid_CStatistic', -1, undef); | |
634 | ||
635 | do { # get new values | |
636 | ||
637 | $count = 0; | |
638 | ||
639 | eval { | |
640 | $ldb->begin_work; | |
641 | ||
642 | my $lastid = PMG::DBTools::read_int_clusterinfo( | |
643 | $ldb, $rcid, 'lastid_CStatistic'); | |
644 | ||
645 | # sync CStatistic | |
646 | ||
647 | my $sth = $rdb->prepare( | |
648 | "SELECT * from CStatistic " . | |
649 | "WHERE cid = ? AND rid > ? " . | |
650 | "ORDER BY cid, rid LIMIT ?"); | |
651 | $sth->execute($rcid, $lastid, $maxcount); | |
652 | ||
653 | my $maxid; | |
654 | my $callback = sub { | |
655 | my $ref = shift; | |
656 | $maxid = $ref->{rid}; | |
657 | }; | |
658 | ||
659 | my $attrs = [qw(cid rid time bytes direction spamlevel ptime virusinfo sender)]; | |
660 | $count += PMG::DBTools::copy_selected_data($ldb, $sth, 'CStatistic', $attrs, $callback); | |
661 | ||
662 | if ($maxid) { | |
663 | # sync CReceivers | |
664 | ||
665 | $sth = $rdb->prepare( | |
666 | "SELECT * from CReceivers WHERE " . | |
667 | "CStatistic_CID = ? AND CStatistic_RID > ? AND CStatistic_RID <= ?"); | |
668 | $sth->execute($rcid, $lastid, $maxid); | |
669 | ||
670 | $attrs = [qw(cstatistic_cid cstatistic_rid blocked receiver)]; | |
671 | PMG::DBTools::copy_selected_data($ldb, $sth, 'CReceivers', $attrs); | |
672 | ||
673 | PMG::DBTools::write_maxint_clusterinfo ($ldb, $rcid, 'lastid_CStatistic', $maxid); | |
674 | } | |
675 | ||
676 | $ldb->commit; | |
677 | }; | |
678 | if (my $err = $@) { | |
679 | $ldb->rollback; | |
680 | die $err; | |
681 | } | |
682 | ||
683 | $mscount += $count; | |
684 | ||
685 | last if $mscount >= $maxmails; | |
686 | ||
687 | } while ($count >= $maxcount); | |
e86b85a3 DM |
688 | |
689 | return $mscount; | |
9430b6d4 DM |
690 | } |
691 | ||
986eec31 | 692 | my $sync_generic_mtime_db = sub { |
9430b6d4 DM |
693 | my ($ldb, $rdb, $ni, $table, $selectfunc, $mergefunc) = @_; |
694 | ||
9430b6d4 DM |
695 | my $ctime = PMG::DBTools::get_remote_time($rdb); |
696 | ||
697 | PMG::DBTools::create_clusterinfo_default($ldb, $ni->{cid}, "lastmt_$table", 0, undef); | |
698 | ||
699 | my $lastmt = PMG::DBTools::read_int_clusterinfo($ldb, $ni->{cid}, "lastmt_$table"); | |
700 | ||
4c93256a | 701 | my $sql_cmd = $selectfunc->($ctime, $lastmt); |
9430b6d4 | 702 | |
4c93256a | 703 | my $sth = $rdb->prepare($sql_cmd); |
9430b6d4 DM |
704 | |
705 | $sth->execute(); | |
706 | ||
986eec31 DM |
707 | my $updates = 0; |
708 | ||
4c93256a DM |
709 | eval { |
710 | # use transaction to speedup things | |
711 | my $max = 1000; # UPDATE MAX ENTRIES AT ONCE | |
4c93256a DM |
712 | my $count = 0; |
713 | while (my $ref = $sth->fetchrow_hashref()) { | |
9d5bdd2a DM |
714 | $ldb->begin_work if !$count; |
715 | $mergefunc->($ref); | |
4c93256a DM |
716 | if (++$count >= $max) { |
717 | $count = 0; | |
718 | $ldb->commit; | |
9430b6d4 | 719 | } |
986eec31 | 720 | $updates++; |
9430b6d4 | 721 | } |
9d5bdd2a DM |
722 | |
723 | $ldb->commit if $count; | |
9430b6d4 | 724 | }; |
4c93256a DM |
725 | if (my $err = $@) { |
726 | $ldb->rollback; | |
727 | die $err; | |
9430b6d4 DM |
728 | } |
729 | ||
9430b6d4 DM |
730 | PMG::DBTools::write_maxint_clusterinfo($ldb, $ni->{cid}, "lastmt_$table", $ctime); |
731 | ||
986eec31 DM |
732 | return $updates; |
733 | }; | |
9430b6d4 | 734 | |
5e1408fd DM |
735 | sub sync_localstat_db { |
736 | my ($dbh, $rdb, $ni) = @_; | |
737 | ||
738 | my $rcid = $ni->{cid}; | |
739 | ||
740 | my $selectfunc = sub { | |
741 | my ($ctime, $lastmt) = @_; | |
742 | return "SELECT * from LocalStat WHERE mtime >= $lastmt AND cid = $rcid"; | |
743 | }; | |
744 | ||
745 | my $merge_sth = $dbh->prepare( | |
ebd19c79 DM |
746 | 'INSERT INTO LocalStat (Time, RBLCount, PregreetCount, CID, MTime) ' . |
747 | 'VALUES (?, ?, ?, ?, ?) ' . | |
5e1408fd | 748 | 'ON CONFLICT (Time, CID) DO UPDATE SET ' . |
ebd19c79 | 749 | 'RBLCount = excluded.RBLCount, PregreetCount = excluded.PregreetCount, MTime = excluded.MTime'); |
5e1408fd DM |
750 | |
751 | my $mergefunc = sub { | |
752 | my ($ref) = @_; | |
753 | ||
ebd19c79 | 754 | $merge_sth->execute($ref->{time}, $ref->{rblcount}, $ref->{pregreetcount}, $ref->{cid}, $ref->{mtime}); |
5e1408fd DM |
755 | }; |
756 | ||
757 | return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'LocalStat', $selectfunc, $mergefunc); | |
758 | } | |
759 | ||
9430b6d4 DM |
760 | sub sync_greylist_db { |
761 | my ($dbh, $rdb, $ni) = @_; | |
762 | ||
763 | my $selectfunc = sub { | |
764 | my ($ctime, $lastmt) = @_; | |
765 | return "SELECT * from CGreylist WHERE extime >= $ctime AND " . | |
766 | "mtime >= $lastmt AND CID != 0"; | |
767 | }; | |
768 | ||
2e049252 | 769 | my $merge_sth = $dbh->prepare($PMG::DBTools::cgreylist_merge_sql); |
9430b6d4 | 770 | my $mergefunc = sub { |
986eec31 | 771 | my ($ref) = @_; |
9430b6d4 | 772 | |
f413f920 DM |
773 | $merge_sth->execute( |
774 | $ref->{ipnet}, $ref->{host}, $ref->{sender}, $ref->{receiver}, | |
775 | $ref->{instance}, $ref->{rctime}, $ref->{extime}, $ref->{delay}, | |
776 | $ref->{blocked}, $ref->{passed}, 0, $ref->{cid}); | |
9430b6d4 DM |
777 | }; |
778 | ||
986eec31 | 779 | return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'CGreylist', $selectfunc, $mergefunc); |
9430b6d4 DM |
780 | } |
781 | ||
782 | sub sync_userprefs_db { | |
783 | my ($dbh, $rdb, $ni) = @_; | |
784 | ||
785 | my $selectfunc = sub { | |
786 | my ($ctime, $lastmt) = @_; | |
787 | ||
788 | return "SELECT * from UserPrefs WHERE mtime >= $lastmt"; | |
789 | }; | |
790 | ||
7d13157e | 791 | my $merge_sth = $dbh->prepare( |
471b05ed | 792 | "INSERT INTO UserPrefs (PMail, Name, Data, MTime) " . |
0527ea1a | 793 | 'VALUES (?, ?, ?, 0) ' . |
471b05ed | 794 | 'ON CONFLICT (PMail, Name) DO UPDATE SET ' . |
dbdf1298 | 795 | # Note: MTime = 0 ==> this is just a copy from somewhere else, not modified |
3a9ffd85 | 796 | 'MTime = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN 0 ELSE UserPrefs.MTime END, ' . |
0527ea1a | 797 | 'Data = CASE WHEN excluded.MTime >= UserPrefs.MTime THEN excluded.Data ELSE UserPrefs.Data END'); |
471b05ed | 798 | |
9430b6d4 | 799 | my $mergefunc = sub { |
986eec31 | 800 | my ($ref) = @_; |
9430b6d4 | 801 | |
7d13157e | 802 | $merge_sth->execute($ref->{pmail}, $ref->{name}, $ref->{data}); |
9430b6d4 DM |
803 | }; |
804 | ||
986eec31 | 805 | return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'UserPrefs', $selectfunc, $mergefunc); |
9430b6d4 DM |
806 | } |
807 | ||
808 | sub sync_domainstat_db { | |
809 | my ($dbh, $rdb, $ni) = @_; | |
810 | ||
811 | my $selectfunc = sub { | |
812 | my ($ctime, $lastmt) = @_; | |
813 | return "SELECT * from DomainStat WHERE mtime >= $lastmt"; | |
814 | }; | |
815 | ||
1c7ea32c DM |
816 | my $merge_sth = $dbh->prepare( |
817 | 'INSERT INTO Domainstat ' . | |
818 | '(Time,Domain,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' . | |
819 | 'BouncesIn,BouncesOut,PTimeSum,Mtime) ' . | |
820 | 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' . | |
821 | 'ON CONFLICT (Time, Domain) DO UPDATE SET ' . | |
822 | 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' . | |
823 | 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' . | |
824 | 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' . | |
825 | 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' . | |
826 | 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' . | |
827 | 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime'); | |
828 | ||
9430b6d4 | 829 | my $mergefunc = sub { |
986eec31 | 830 | my ($ref) = @_; |
9430b6d4 | 831 | |
1c7ea32c DM |
832 | $merge_sth->execute( |
833 | $ref->{time}, $ref->{domain}, $ref->{countin}, $ref->{countout}, | |
834 | $ref->{bytesin}, $ref->{bytesout}, | |
835 | $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout}, | |
836 | $ref->{bouncesin}, $ref->{bouncesout}, $ref->{ptimesum}, $ref->{mtime}); | |
9430b6d4 DM |
837 | }; |
838 | ||
986eec31 | 839 | return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DomainStat', $selectfunc, $mergefunc); |
9430b6d4 DM |
840 | } |
841 | ||
842 | sub sync_dailystat_db { | |
843 | my ($dbh, $rdb, $ni) = @_; | |
844 | ||
845 | my $selectfunc = sub { | |
846 | my ($ctime, $lastmt) = @_; | |
847 | return "SELECT * from DailyStat WHERE mtime >= $lastmt"; | |
8bf584ae DM |
848 | }; |
849 | ||
850 | my $merge_sth = $dbh->prepare( | |
851 | 'INSERT INTO DailyStat ' . | |
852 | '(Time,CountIn,CountOut,BytesIn,BytesOut,VirusIn,VirusOut,SpamIn,SpamOut,' . | |
853 | 'BouncesIn,BouncesOut,GreylistCount,SPFCount,RBLCount,PTimeSum,Mtime) ' . | |
854 | 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ' . | |
855 | 'ON CONFLICT (Time) DO UPDATE SET ' . | |
856 | 'CountIn = excluded.CountIn, CountOut = excluded.CountOut, ' . | |
857 | 'BytesIn = excluded.BytesIn, BytesOut = excluded.BytesOut, ' . | |
858 | 'VirusIn = excluded.VirusIn, VirusOut = excluded.VirusOut, ' . | |
859 | 'SpamIn = excluded.SpamIn, SpamOut = excluded.SpamOut, ' . | |
860 | 'BouncesIn = excluded.BouncesIn, BouncesOut = excluded.BouncesOut, ' . | |
861 | 'GreylistCount = excluded.GreylistCount, SPFCount = excluded.SpfCount, ' . | |
862 | 'RBLCount = excluded.RBLCount, ' . | |
863 | 'PTimeSum = excluded.PTimeSum, MTime = excluded.MTime'); | |
9430b6d4 DM |
864 | |
865 | my $mergefunc = sub { | |
986eec31 | 866 | my ($ref) = @_; |
9430b6d4 | 867 | |
dbdf1298 | 868 | $merge_sth->execute( |
8bf584ae DM |
869 | $ref->{time}, $ref->{countin}, $ref->{countout}, |
870 | $ref->{bytesin}, $ref->{bytesout}, | |
871 | $ref->{virusin}, $ref->{virusout}, $ref->{spamin}, $ref->{spamout}, | |
872 | $ref->{bouncesin}, $ref->{bouncesout}, $ref->{greylistcount}, | |
873 | $ref->{spfcount}, $ref->{rblcount}, $ref->{ptimesum}, $ref->{mtime}); | |
9430b6d4 DM |
874 | }; |
875 | ||
986eec31 | 876 | return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'DailyStat', $selectfunc, $mergefunc); |
9430b6d4 DM |
877 | } |
878 | ||
879 | sub sync_virusinfo_db { | |
880 | my ($dbh, $rdb, $ni) = @_; | |
881 | ||
882 | my $selectfunc = sub { | |
883 | my ($ctime, $lastmt) = @_; | |
884 | return "SELECT * from VirusInfo WHERE mtime >= $lastmt"; | |
885 | }; | |
886 | ||
80615ad6 DM |
887 | my $merge_sth = $dbh->prepare( |
888 | 'INSERT INTO VirusInfo (Time,Name,Count,MTime) ' . | |
889 | 'VALUES (?,?,?,?) ' . | |
890 | 'ON CONFLICT (Time,Name) DO UPDATE SET ' . | |
891 | 'Count = excluded.Count , MTime = excluded.MTime'); | |
892 | ||
9430b6d4 | 893 | my $mergefunc = sub { |
986eec31 | 894 | my ($ref) = @_; |
9430b6d4 | 895 | |
80615ad6 | 896 | $merge_sth->execute($ref->{time}, $ref->{name}, $ref->{count}, $ref->{mtime}); |
9430b6d4 DM |
897 | }; |
898 | ||
986eec31 | 899 | return $sync_generic_mtime_db->($dbh, $rdb, $ni, 'VirusInfo', $selectfunc, $mergefunc); |
9430b6d4 DM |
900 | } |
901 | ||
902 | sub sync_deleted_nodes_from_master { | |
903 | my ($ldb, $masterdb, $cinfo, $masterni, $rsynctime_ref) = @_; | |
904 | ||
905 | my $rsynctime = 0; | |
906 | ||
907 | my $cid_hash = {}; # fast lookup | |
5c6e6eeb | 908 | foreach my $ni (values %{$cinfo->{ids}}) { |
9430b6d4 DM |
909 | $cid_hash->{$ni->{cid}} = $ni; |
910 | } | |
911 | ||
912 | my $spooldir = $PMG::MailQueue::spooldir; | |
913 | ||
5c6e6eeb DM |
914 | my $maxcid = $cinfo->{master}->{maxcid} // 0; |
915 | ||
916 | for (my $rcid = 1; $rcid <= $maxcid; $rcid++) { | |
9430b6d4 DM |
917 | next if $cid_hash->{$rcid}; |
918 | ||
919 | my $done_marker = "$spooldir/cluster/$rcid/.synced-deleted-node"; | |
920 | ||
921 | next if -f $done_marker; # already synced | |
922 | ||
923 | syslog('info', "syncing deleted node $rcid from master '$masterni->{ip}'"); | |
924 | ||
925 | my $starttime = [ gettimeofday() ]; | |
926 | sync_spooldir($masterni->{ip}, $masterni->{name}, $rcid); | |
927 | $$rsynctime_ref += tv_interval($starttime); | |
928 | ||
929 | my $fake_ni = { | |
930 | ip => $masterni->{ip}, | |
931 | name => $masterni->{name}, | |
932 | cid => $rcid, | |
933 | }; | |
934 | ||
935 | sync_quarantine_db($ldb, $masterdb, $fake_ni); | |
936 | ||
937 | sync_statistic_db ($ldb, $masterdb, $fake_ni); | |
938 | ||
939 | open(my $fh, ">>", $done_marker); | |
940 | } | |
941 | } | |
942 | ||
943 | ||
0854fb22 | 944 | 1; |