]> git.proxmox.com Git - pmg-api.git/blob - PMG/Service/pmgtunnel.pm
PMG/Service/pmgtunnel.pm: avoid use of $cinfo->{remnodes}
[pmg-api.git] / PMG / Service / pmgtunnel.pm
1 package PMG::Service::pmgtunnel;
2
3 use strict;
4 use warnings;
5 use Data::Dumper;
6 use Time::HiRes qw (gettimeofday);
7
8 use PVE::SafeSyslog;
9 use PVE::Tools qw(extract_param);
10 use PVE::INotify;
11 use PVE::Daemon;
12
13 use PMG::RESTEnvironment;
14 use PMG::DBTools;
15 use PMG::RuleDB;
16 use PMG::Cluster;
17 use PMG::ClusterConfig;
18 use PMG::Statistic;
19
20 use base qw(PVE::Daemon);
21
22 my $cmdline = [$0, @ARGV];
23
24 my %daemon_options = (restart_on_error => 5, stop_wait_time => 5);
25
26 my $daemon = __PACKAGE__->new('pmgtunnel', $cmdline, %daemon_options);
27
28 my $restart_request = 0;
29 my $next_update = 0;
30
31 my $cycle = 0;
32 my $updatetime = 10;
33
34 my $workers = {};
35 my $delayed_exec = {};
36 my $startcount = {};
37
38 sub finish_children {
39 while ((my $cpid = waitpid(-1, POSIX::WNOHANG())) > 0) {
40 if (defined($workers->{$cpid})) {
41 my $ip = $workers->{$cpid}->{ip};
42 my $cid = $workers->{$cpid}->{cid};
43 syslog('err', "tunnel finished $cpid $ip");
44 $delayed_exec->{$cid} = time + ($startcount->{$cid} > 5 ? 60 : 10);
45 delete $workers->{$cpid};
46 }
47 }
48 }
49
50 sub start_tunnels {
51 my ($self, $cinfo) = @_;
52
53 my $role = $cinfo->{local}->{type} // '-';
54 return if $role eq '-';
55
56 foreach my $cid (keys %{$cinfo->{ids}}) {
57 my $ni = $cinfo->{ids}->{$cid};
58 next if $ni->{ip} eq $cinfo->{local}->{ip}; # just to be sure
59
60 my $dbport = $cinfo->{dbport}->{$cid};
61 next if !$dbport; # just to be sure
62
63 my $running;
64 foreach my $cpid (keys %$workers) {
65 $running = 1 if $workers->{$cpid}->{ip} eq $ni->{ip};
66 }
67 next if $running;
68
69 if ($delayed_exec->{$cid} && (time < $delayed_exec->{$cid})) {
70 next;
71 }
72 $delayed_exec->{$cid} = 0;
73 $startcount->{$cid}++;
74
75 my $pid = fork;
76
77 if (!defined ($pid)) {
78
79 syslog('err', "can't fork tunnel");
80
81 } elsif($pid) { # parent
82
83 $workers->{$pid}->{ip} = $ni->{ip};
84 $workers->{$pid}->{cid} = $cid;
85 $workers->{$pid}->{dbport} = $dbport;
86
87 if ($startcount->{$cid} > 1) {
88 syslog('info', "restarting crashed tunnel $pid $ni->{ip}");
89 } else {
90 syslog('info', "starting tunnel $pid $ni->{ip}");
91 }
92
93 } else { # child
94
95 $self->after_fork_cleanup();
96
97 # make sure we use ipv4 127.0.0.1 (instead of ipv6 :::1)
98 exec('/usr/bin/ssh', '-N', '-o', 'BatchMode=yes',
99 '-L', "$dbport:127.0.0.1:5432",
100 $ni->{ip});
101 exit (0);
102 }
103 }
104 }
105
106 sub purge_tunnels {
107 my ($self, $cinfo) = @_;
108
109 foreach my $cpid (keys %$workers) {
110 my $ip = $workers->{$cpid}->{ip};
111 my $dbport = $workers->{$cpid}->{dbport};
112 my $cid = $workers->{$cpid}->{cid};
113
114 my $found;
115 foreach my $ni (values %{$cinfo->{ids}}) {
116 my $ni_dbport = $cinfo->{dbport}->{$ni->{cid}};
117 $found = 1 if (($ni->{ip} eq $ip) && ($ni_dbport eq $dbport));
118 }
119
120 my $role = $cinfo->{local}->{type} // '-';
121 $found = 0 if $role eq '-';
122
123 if (!$found) {
124 syslog ('info', "trying to finish tunnel $cpid $ip");
125 kill(15, $cpid);
126 $delayed_exec->{$cid} = time + ($startcount->{$cid} > 5 ? 60 : 10);
127 delete $workers->{$cpid};
128 }
129 }
130 }
131
132 sub init {
133 # syslog('INIT');
134 }
135
136 sub shutdown {
137 my ($self) = @_;
138
139 syslog('info' , "server closing");
140
141 foreach my $cpid (keys %$workers) {
142 if (kill (15, $cpid) || ! kill(0, $cpid)) {
143 my $ip = $workers->{$cpid}->{ip};
144 delete $workers->{$cpid};
145 syslog ('info', "successfully deleted tunnel $cpid $ip");
146 }
147 }
148
149 # wait for children
150 1 while (waitpid(-1, POSIX::WNOHANG()) > 0);
151
152 # $self->exit_daemon(0);
153 }
154
155 sub hup {
156 my ($self) = @_;
157
158 $restart_request = 1;
159 }
160
161
162
163 sub run {
164 my ($self) = @_;
165
166 local $SIG{CHLD} = \&finish_children;
167
168 for (;;) { # forever
169
170 $next_update = time() + $updatetime;
171
172 eval {
173 my $cinfo = PMG::ClusterConfig->new(); # reload
174 $self->purge_tunnels($cinfo);
175 $self->start_tunnels($cinfo);
176 };
177
178 if (my $err = $@) {
179
180 syslog('err', "status update error: $err");
181 }
182
183 my $wcount = 0;
184 while ((time() < $next_update) &&
185 ($wcount < $updatetime) && # protect against time wrap
186 !$restart_request && !$self->{terminate}) {
187
188 finish_children();
189
190 $wcount++; sleep (1);
191 };
192
193 last if $self->{terminate};
194
195 $self->restart_daemon() if $restart_request;
196 }
197 }
198
199 __PACKAGE__->register_method ({
200 name => 'status',
201 path => 'status',
202 method => 'GET',
203 description => "Print cluster tunnel status.",
204 parameters => {
205 additionalProperties => 0,
206 properties => {},
207 },
208 returns => { type => 'null' },
209 code => sub {
210 my ($param) = @_;
211
212 my $status = $daemon->running() ? 'running' : 'stopped';
213 print "$status\n";
214
215 return undef;
216 }});
217
218
219 $daemon->register_start_command("Start the Cluster Tunnel Daemon");
220 $daemon->register_stop_command("Stop the Cluster Tunnel Daemon");
221 $daemon->register_restart_command(1, "Restart the Cluster Tunnel Daemon");
222
223 our $cmddef = {
224 start => [ __PACKAGE__, 'start', []],
225 restart => [ __PACKAGE__, 'restart', []],
226 stop => [ __PACKAGE__, 'stop', []],
227 status => [ __PACKAGE__, 'status', []]
228 };