]> git.proxmox.com Git - pve-manager.git/blame - PVE/HTTPServer.pm
move HTTPServer into separate file
[pve-manager.git] / PVE / HTTPServer.pm
CommitLineData
57f93db1
DM
1package PVE::HTTPServer;
2
3use strict;
4use warnings;
5use Socket qw(IPPROTO_TCP TCP_NODELAY SOMAXCONN);
6use POSIX qw(strftime EINTR EAGAIN);
7use Fcntl;
8use File::stat qw();
9use AnyEvent::Strict;
10use AnyEvent::Util qw(guard fh_nonblocking WSAEWOULDBLOCK WSAEINPROGRESS);
11use AnyEvent::Handle;
12use AnyEvent::TLS;
13use AnyEvent::IO;
14use AnyEvent::HTTP;
15use Fcntl ();
16use Compress::Zlib;
17use PVE::SafeSyslog;
18use PVE::INotify;
19use PVE::RPCEnvironment;
20use PVE::REST;
21
22use URI;
23use HTTP::Status qw(:constants);
24use HTTP::Headers;
25use HTTP::Response;
26
27use CGI; # fixme: remove this!
28# DOS attack prevention
29# fixme: remove CGI.pm
30$CGI::DISABLE_UPLOADS = 1; # no uploads
31$CGI::POST_MAX = 1024 * 10; # max 10K posts
32
33use Scalar::Util qw/weaken/; # fixme: remove?
34use Data::Dumper; # fixme: remove
35
36my $known_methods = {
37 GET => 1,
38 POST => 1,
39 PUT => 1,
40 DELETE => 1,
41};
42
43sub log_request {
44 my ($self, $reqstate) = @_;
45
46 return if !$self->{loghdl};
47
48 my $loginfo = $reqstate->{log};
49
50 # like apache2 common log format
51 # LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\""
52
53 my $peerip = $reqstate->{peer_host} || '-';
54 my $userid = $loginfo->{userid} || '-';
55 my $content_length = defined($loginfo->{content_length}) ? $loginfo->{content_length} : '-';
56 my $code = $loginfo->{code} || 500;
57 my $requestline = $loginfo->{requestline} || '-';
58 my $timestr = strftime("%d/%b/%Y:%H:%M:%S %z", localtime());
59
60 my $msg = "$peerip - $userid [$timestr] \"$requestline\" $code $content_length\n";
61
62 $self->{loghdl}->push_write($msg);
63}
64
65sub log_aborted_request {
66 my ($self, $reqstate, $error) = @_;
67
68 my $r = $reqstate->{request};
69 return if !$r; # no active request
70
71 if ($error) {
72 syslog("err", "problem with client $reqstate->{peer_host}; $error");
73 }
74
75 $self->log_request($reqstate);
76}
77
78sub client_do_disconnect {
79 my ($self, $reqstate) = @_;
80
81 my $hdl = delete $reqstate->{hdl};
82
83 if (!$hdl) {
84 syslog('err', "detected empty handle");
85 return;
86 }
87
88 #print "close connection $hdl\n";
89
90 shutdown($hdl->{fh}, 1);
91 # clear all handlers
92 $hdl->on_drain(undef);
93 $hdl->on_read(undef);
94 $hdl->on_eof(undef);
95 $self->{conn_count}--;
96
97 #print "$$: client_do_disconnect $self->{conn_count} $hdl\n";
98}
99
100sub finish_response {
101 my ($self, $reqstate) = @_;
102
103 my $hdl = $reqstate->{hdl};
104
105 delete $reqstate->{log};
106 delete $reqstate->{request};
107 delete $reqstate->{proto};
108
109 if (!$self->{end_loop} && $reqstate->{keep_alive} > 0) {
110 # print "KEEPALIVE $reqstate->{keep_alive}\n";
111 $hdl->on_read(sub {
112 eval { $self->push_request_header($reqstate); };
113 warn $@ if $@;
114 });
115 } else {
116 $hdl->on_drain (sub {
117 eval {
118 $self->client_do_disconnect($reqstate);
119 };
120 warn $@ if $@;
121 });
122 }
123}
124
125sub response {
126 my ($self, $reqstate, $resp, $mtime, $nocomp) = @_;
127
128 #print "$$: send response: " . Dumper($resp);
129
130 my $code = $resp->code;
131 my $msg = $resp->message || HTTP::Status::status_message($code);
132 ($msg) = $msg =~m/^(.*)$/m;
133 my $content = $resp->content;
134
135 if ($code =~ /^(1\d\d|[23]04)$/) {
136 # make sure content we have no content
137 $content = "";
138 }
139
140 $reqstate->{keep_alive} = 0 if ($code >= 300) || $self->{end_loop};
141
142 $reqstate->{log}->{code} = $code;
143
144 my $res = "HTTP/1.0 $code $msg\015\012";
145
146 my $ctime = time();
147 my $date = HTTP::Date::time2str($ctime);
148 $resp->header('Date' => $date);
149 if ($mtime) {
150 $resp->header('Last-Modified' => HTTP::Date::time2str($mtime));
151 } else {
152 $resp->header('Expires' => $date);
153 $resp->header('Cache-Control' => "max-age=0");
154 $resp->header("Pragma", "no-cache");
155 }
156
157 $resp->header('Server' => "pve-api-daemon/3.0");
158
159 my $content_length;
160 if (ref($content) eq "CODE") {
161 $reqstate->{keep_alive} = 0;
162
163 # fixme:
164
165 } elsif ($content) {
166
167 $content_length = length($content);
168
169 if (!$nocomp && ($content_length > 1024)) {
170 my $comp = Compress::Zlib::memGzip($content);
171 $resp->header('Content-Encoding', 'gzip');
172 $content = $comp;
173 $content_length = length($content);
174 }
175 $resp->header("Content-Length" => $content_length);
176 $reqstate->{log}->{content_length} = $content_length;
177 } else {
178 $resp->remove_header("Content-Length");
179 }
180
181 if ($reqstate->{keep_alive} > 0) {
182 $resp->push_header('Connection' => 'Keep-Alive');
183 } else {
184 $resp->header('Connection' => 'close');
185 }
186
187 $res .= $resp->headers_as_string("\015\012");
188 #print "SEND(supress content) $res\n";
189
190 $res .= "\015\012";
191 $res .= $content;
192
193 $self->log_request($reqstate, $reqstate->{request});
194
195 $reqstate->{hdl}->push_write($res);
196 $self->finish_response($reqstate);
197}
198
199sub error {
200 my ($self, $reqstate, $code, $msg, $hdr, $content) = @_;
201
202 eval {
203 my $resp = HTTP::Response->new($code, $msg, $hdr, $content);
204 $self->response($reqstate, $resp);
205 };
206 warn $@ if $@;
207}
208
209sub send_file_start {
210 my ($self, $reqstate, $filename) = @_;
211
212 eval {
213 # print "SEND FILE $filename\n";
214 # Note: aio_load() this is not really async unless we use IO::AIO!
215 eval {
216
217 my $fh = IO::File->new($filename, '<') ||
218 die "$!\n";
219 my $stat = File::stat::stat($fh) ||
220 die "$!\n";
221
222 my $data;
223 my $len = sysread($fh, $data, $stat->size);
224 die "got short file\n" if !defined($len) || $len != $stat->size;
225
226 my $ct;
227 if ($filename =~ m/\.css$/) {
228 $ct = 'text/css';
229 } elsif ($filename =~ m/\.js$/) {
230 $ct = 'application/javascript';
231 } elsif ($filename =~ m/\.png$/) {
232 $ct = 'image/png';
233 } elsif ($filename =~ m/\.gif$/) {
234 $ct = 'image/gif';
235 } elsif ($filename =~ m/\.jar$/) {
236 $ct = 'application/java-archive';
237 } else {
238 die "unable to detect content type";
239 }
240
241 my $header = HTTP::Headers->new(Content_Type => $ct);
242 my $resp = HTTP::Response->new(200, "OK", $header, $data);
243 $self->response($reqstate, $resp, $stat->mtime);
244 };
245 if (my $err = $@) {
246 $self->error($reqstate, 501, $err);
247 }
248 };
249
250 warn $@ if $@;
251}
252
253sub proxy_request {
254 my ($self, $reqstate, $r, $clientip, $host, $method, $abs_uri, $ticket, $token, $params) = @_;
255
256 eval {
257 my $target;
258 if ($host eq 'localhost') {
259 $target = "http://$host:85$abs_uri";
260 } else {
261 $target = "https://$host:8006$abs_uri";
262 }
263
264 my $headers = {
265 PVEDisableProxy => 'true',
266 PVEClientIP => $clientip,
267 };
268
269 my $cookie_name = 'PVEAuthCookie';
270
271 $headers->{'cookie'} = PVE::REST::create_auth_cookie($ticket) if $ticket;
272 $headers->{'CSRFPreventionToken'} = $token if $token;
273
274 my $content;
275
276 if ($method eq 'POST' || $method eq 'PUT') {
277 $headers->{'Content-Type'} = 'application/x-www-form-urlencoded';
278 # We use a temporary URI object to format
279 # the application/x-www-form-urlencoded content.
280 my $url = URI->new('http:');
281 $url->query_form(%$params);
282 $content = $url->query;
283 if (defined($content)) {
284 $headers->{'Content-Length'} = length($content);
285 }
286 }
287
288 # fixme: tls_ctx;
289
290 my $w; $w = http_request(
291 $method => $target,
292 headers => $headers,
293 timeout => 30,
294 resurse => 0,
295 body => $content,
296 sub {
297 my ($body, $hdr) = @_;
298
299 undef $w;
300
301 eval {
302 my $code = delete $hdr->{Status};
303 my $msg = delete $hdr->{Reason};
304 delete $hdr->{URL};
305 delete $hdr->{HTTPVersion};
306 my $header = HTTP::Headers->new(%$hdr);
307 my $resp = HTTP::Response->new($code, $msg, $header, $body);
308 $self->response($reqstate, $resp, undef, 1);
309 };
310 warn $@ if $@;
311 });
312 };
313 warn $@ if $@;
314}
315
316my $extract_params = sub {
317 my ($r, $method) = @_;
318
319 # NOTE: HTTP::Request::Params return undef instead of ''
320 #my $parser = HTTP::Request::Params->new({req => $r});
321 #my $params = $parser->params;
322
323 my $post_params = {};
324
325 if ($method eq 'PUT' || $method eq 'POST') {
326 $post_params = CGI->new($r->content())->Vars;
327 }
328
329 my $query_params = CGI->new($r->url->query)->Vars;
330 my $params = $post_params || {};
331
332 foreach my $k (keys %{$query_params}) {
333 $params->{$k} = $query_params->{$k};
334 }
335
336 return PVE::Tools::decode_utf8_parameters($params);
337};
338
339sub handle_api2_request {
340 my ($self, $reqstate) = @_;
341
342 eval {
343 my $r = $reqstate->{request};
344 my $method = $r->method();
345 my $path = $r->uri->path();
346
347 my ($rel_uri, $format) = PVE::REST::split_abs_uri($path);
348 if (!$format) {
349 $self->error($reqstate, HTTP_NOT_IMPLEMENTED, "no such uri");
350 return;
351 }
352
353 my $rpcenv = $self->{rpcenv};
354 my $headers = $r->headers;
355
356 my $token = $headers->header('CSRFPreventionToken');
357
358 my $cookie = $headers->header('Cookie');
359
360 my $ticket = PVE::REST::extract_auth_cookie($cookie);
361
362 my $params = &$extract_params($r, $method);
363
364 my $clientip = $headers->header('PVEClientIP');
365
366 $rpcenv->init_request(params => $params);
367
368 my $res = PVE::REST::rest_handler($rpcenv, $clientip, $method, $path, $rel_uri, $ticket, $token);
369
370 # fixme: eval { $userid = $rpcenv->get_user(); };
371 my $userid = $rpcenv->{user}; # this is faster
372 $rpcenv->set_user(undef); # clear after request
373
374 $reqstate->{log}->{userid} = $userid;
375
376 if ($res->{proxy}) {
377
378 if ($self->{trusted_env}) {
379 $self->error($reqstate, HTTP_INTERNAL_SERVER_ERROR, "proxy not allowed");
380 return;
381 }
382
383 $self->proxy_request($reqstate, $r, $clientip, $res->{proxy}, $method,
384 $r->uri, $ticket, $token, $res->{proxy_params});
385 return;
386
387 }
388
389 PVE::REST::prepare_response_data($format, $res);
390 my ($raw, $ct) = PVE::REST::format_response_data($format, $res, $path);
391
392 my $resp = HTTP::Response->new($res->{status}, $res->{message});
393 $resp->header("Content-Type" => $ct);
394 $resp->content($raw);
395 $self->response($reqstate, $resp);
396
397 return;
398 };
399 warn $@ if $@;
400}
401
402sub handle_request {
403 my ($self, $reqstate) = @_;
404
405 #print "REQUEST" . Dumper($reqstate->{request});
406
407 eval {
408 my $r = $reqstate->{request};
409 my $method = $r->method();
410 my $path = $r->uri->path();
411
412 # print "REQUEST $path\n";
413
414 if (!$known_methods->{$method}) {
415 my $resp = HTTP::Response->new(HTTP_NOT_IMPLEMENTED, "method '$method' not available");
416 $self->response($reqstate, $resp);
417 return;
418 }
419
420 if ($path =~ m!/api2!) {
421 $self->handle_api2_request($reqstate);
422 return;
423 }
424
425 if ($self->{pages} && ($method eq 'GET') && (my $handler = $self->{pages}->{$path})) {
426 if (ref($handler) eq 'CODE') {
427 my ($resp, $userid) = &$handler($self, $reqstate->{request});
428 $self->response($reqstate, $resp);
429 } elsif (ref($handler) eq 'HASH') {
430 if (my $filename = $handler->{file}) {
431 my $fh = IO::File->new($filename) ||
432 die "unable to open file '$filename' - $!\n";
433 send_file_start($self, $reqstate, $filename);
434 } else {
435 die "internal error - no handler";
436 }
437 } else {
438 die "internal error - no handler";
439 }
440 return;
441 }
442
443 if ($self->{dirs} && ($method eq 'GET')) {
444 # we only allow simple names
445 if ($path =~ m!^(/\S+/)([a-zA-Z0-9\-\_\.]+)$!) {
446 my ($subdir, $file) = ($1, $2);
447 if (my $dir = $self->{dirs}->{$subdir}) {
448 my $filename = "$dir$file";
449 my $fh = IO::File->new($filename) ||
450 die "unable to open file '$filename' - $!\n";
451 send_file_start($self, $reqstate, $filename);
452 return;
453 }
454 }
455 }
456
457 die "no such file '$path'";
458 };
459 if (my $err = $@) {
460 $self->error($reqstate, 501, $err);
461 }
462}
463
464sub unshift_read_header {
465 my ($self, $reqstate, $state) = @_;
466
467 $state = {} if !$state;
468
469 $reqstate->{hdl}->unshift_read(line => sub {
470 my ($hdl, $line) = @_;
471
472 eval {
473 #print "$$: got header: $line\n";
474
475 my $r = $reqstate->{request};
476 if ($line eq '') {
477
478 $r->push_header($state->{key}, $state->{val})
479 if $state->{key};
480
481 my $conn = $r->header('Connection');
482
483 if ($conn) {
484 $reqstate->{keep_alive} = 0 if $conn =~ m/close/oi;
485 } else {
486 if ($reqstate->{proto}->{ver} < 1001) {
487 $reqstate->{keep_alive} = 0;
488 }
489 }
490
491 # how much content to read?
492 my $te = $r->header('Transfer-Encoding');
493 my $len = $r->header('Content-Length');
494 my $pveclientip = $r->header('PVEClientIP');
495
496 # fixme:
497 if ($self->{trusted_env} && $pveclientip) {
498 $reqstate->{peer_host} = $pveclientip;
499 } else {
500 $r->header('PVEClientIP', $reqstate->{peer_host});
501 }
502
503 if ($te && lc($te) eq 'chunked') {
504 # Handle chunked transfer encoding
505 $self->error($reqstate, 501, "chunked transfer encoding not supported");
506 } elsif ($te) {
507 $self->error($reqstate, 501, "Unknown transfer encoding '$te'");
508 } elsif (defined($len)) {
509 $reqstate->{hdl}->unshift_read (chunk => $len, sub {
510 my ($hdl, $data) = @_;
511 $r->content($data);
512 $self->handle_request($reqstate);
513 });
514 } else {
515 $self->handle_request($reqstate);
516 }
517 } elsif ($line =~ /^([^:\s]+)\s*:\s*(.*)/) {
518 $r->push_header($state->{key}, $state->{val}) if $state->{key};
519 ($state->{key}, $state->{val}) = ($1, $2);
520 $self->unshift_read_header($reqstate, $state);
521 } elsif ($line =~ /^\s+(.*)/) {
522 $state->{val} .= " $1";
523 $self->unshift_read_header($reqstate, $state);
524 } else {
525 $self->error($reqstate, 506, "unable to parse request header");
526 }
527 };
528 warn $@ if $@;
529 });
530};
531
532sub push_request_header {
533 my ($self, $reqstate) = @_;
534
535 eval {
536 $reqstate->{hdl}->push_read(line => sub {
537 my ($hdl, $line) = @_;
538
539 eval {
540 #print "got request header: $line\n";
541
542 $reqstate->{keep_alive}--;
543
544 if ($line =~ /(\S+)\040(\S+)\040HTTP\/(\d+)\.(\d+)/o) {
545 my ($method, $uri, $maj, $min) = ($1, $2, $3, $4);
546
547 if ($maj != 1) {
548 $self->error($reqstate, 506, "http protocol version $maj.$min not supported");
549 return;
550 }
551
552 $self->{request_count}++; # only count valid request headers
553 if ($self->{request_count} >= $self->{max_requests}) {
554 $self->{end_loop} = 1;
555 }
556 $reqstate->{log} = { requestline => $line };
557 $reqstate->{proto}->{maj} = $maj;
558 $reqstate->{proto}->{min} = $min;
559 $reqstate->{proto}->{ver} = $maj*1000+$min;
560 $reqstate->{request} = HTTP::Request->new($method, $uri);
561
562 $self->unshift_read_header($reqstate);
563 } elsif ($line eq '') {
564 # ignore empty lines before requests (browser bugs?)
565 $self->push_request_header($reqstate);
566 } else {
567 $self->error($reqstate, 400, 'bad request');
568 }
569 };
570 warn $@ if $@;
571 });
572 };
573 warn $@ if $@;
574}
575
576sub accept {
577 my ($self) = @_;
578
579 my $clientfh;
580
581 return if $self->{end_loop};
582
583 # we need to m make sure that only one process calls accept
584 while (!flock($self->{lockfh}, Fcntl::LOCK_EX())) {
585 next if $! == EINTR;
586 die "could not get lock on file '$self->{lockfile}' - $!\n";
587 }
588
589 my $again = 0;
590 my $errmsg;
591 eval {
592 while (!$self->{end_loop} &&
593 !defined($clientfh = $self->{socket}->accept()) &&
594 ($! == EINTR)) {};
595
596 if ($self->{end_loop}) {
597 $again = 0;
598 } else {
599 $again = ($! == EAGAIN || $! == WSAEWOULDBLOCK);
600 if (!defined($clientfh)) {
601 $errmsg = "failed to accept connection: $!\n";
602 }
603 }
604 };
605 warn $@ if $@;
606
607 flock($self->{lockfh}, Fcntl::LOCK_UN());
608
609 if (!defined($clientfh)) {
610 return if $again;
611 die $errmsg if $errmsg;
612 }
613
614 fh_nonblocking $clientfh, 1;
615
616 $self->{conn_count}++;
617
618 print "$$: ACCEPT OK $self->{conn_count} FH" . $clientfh->fileno() . "\n";
619
620 return $clientfh;
621}
622
623sub wait_end_loop {
624 my ($self) = @_;
625
626 $self->{end_loop} = 1;
627
628 undef $self->{socket_watch};
629
630 if ($self->{conn_count} <= 0) {
631 $self->{end_cond}->send(1);
632 return;
633 }
634
635 # else we need to wait until all open connections gets closed
636 my $w; $w = AnyEvent->timer (after => 1, interval => 1, cb => sub {
637 eval {
638 # fixme: test for active connections instead?
639 if ($self->{conn_count} <= 0) {
640 undef $w;
641 $self->{end_cond}->send(1);
642 }
643 };
644 warn $@ if $@;
645 });
646}
647
648sub accept_connections {
649 my ($self) = @_;
650
651 eval {
652
653 while (my $clientfh = $self->accept()) {
654
655 my $reqstate = { keep_alive => $self->{keep_alive} };
656
657 if (my $sin = getpeername($clientfh)) {
658 my ($pport, $phost) = Socket::unpack_sockaddr_in($sin);
659 ($reqstate->{peer_port}, $reqstate->{peer_host}) = ($pport, Socket::inet_ntoa($phost));
660 }
661
662 $reqstate->{hdl} = AnyEvent::Handle->new(
663 fh => $clientfh,
664 rbuf_max => 32768, # fixme: set smaller max read buffer ?
665 timeout => $self->{timeout},
666 linger => 0, # avoid problems with ssh - really needed ?
667 on_eof => sub {
668 my ($hdl) = @_;
669 eval {
670 $self->log_aborted_request($reqstate);
671 $self->client_do_disconnect($reqstate);
672 };
673 if (my $err = $@) { syslog('err', $err); }
674 },
675 on_error => sub {
676 my ($hdl, $fatal, $message) = @_;
677 eval {
678 $self->log_aborted_request($reqstate, $message);
679 $self->client_do_disconnect($reqstate);
680 };
681 if (my $err = $@) { syslog('err', "$err"); }
682 },
683 ($self->{tls_ctx} ? (tls => "accept", tls_ctx => $self->{tls_ctx}) : ()));
684
685 print "$$: ACCEPT OK $reqstate->{hdl} $self->{conn_count}\n";
686
687 $self->push_request_header($reqstate);
688 }
689 };
690
691 if (my $err = $@) {
692 syslog('err', $err);
693 $self->{end_loop} = 1;
694 }
695
696 $self->wait_end_loop() if $self->{end_loop};
697}
698
699sub open_access_log {
700 my ($self, $filename) = @_;
701
702 my $old_mask = umask(0137);;
703 my $logfh = IO::File->new($filename, ">>") ||
704 die "unable to open log file '$filename' - $!\n";
705 umask($old_mask);
706
707 fh_nonblocking($logfh, 1);
708 $self->{loghdl} = AnyEvent::Handle->new(
709 fh => $logfh,
710 on_error => sub {
711 my ($hdl, $fatal, $msg) = @_;
712 syslog('err', "error writing access log: $msg");
713 delete $self->{loghdl};
714 $hdl->destroy;
715 $self->{end_loop} = 1; # terminate asap
716 });;
717
718 return;
719}
720
721sub new {
722 my ($this, %args) = @_;
723
724 my $class = ref($this) || $this;
725
726 foreach my $req (qw(rpcenv socket lockfh lockfile)) {
727 die "misssing required argument '$req'" if !defined($args{$req});
728 }
729
730 my $self = bless { %args }, $class;
731
732 fh_nonblocking($self->{socket}, 1);
733
734 $self->{end_loop} = 0;
735 $self->{conn_count} = 0;
736 $self->{request_count} = 0;
737 $self->{timeout} = 5 if !$self->{timeout};
738 $self->{keep_alive} = 0 if !defined($self->{keep_alive});
739 $self->{max_conn} = 800 if !$self->{max_conn};
740 $self->{max_requests} = 8000 if !$self->{max_requests};
741
742 $self->{end_cond} = AnyEvent->condvar;
743
744 if ($self->{ssl}) {
745 $self->{tls_ctx} = AnyEvent::TLS->new(%{$self->{ssl}});
746 }
747
748 # fixme: logrotate?
749 $self->open_access_log($self->{logfile}) if $self->{logfile};
750
751 $self->{socket_watch} = AnyEvent->io(fh => $self->{socket}, poll => 'r', cb => sub {
752 eval {
753 if ($self->{conn_count} >= $self->{max_conn}) {
754 my $w; $w = AnyEvent->timer (after => 1, interval => 1, cb => sub {
755 if ($self->{conn_count} < $self->{max_conn}) {
756 undef $w;
757 $self->accept_connections();
758 }
759 });
760 } else {
761 $self->accept_connections();
762 }
763 };
764 warn $@ if $@;
765 });
766
767 $self->{term_watch} = AnyEvent->signal(signal => "TERM", cb => sub {
768 undef $self->{term_watch};
769 $self->wait_end_loop();
770 });
771
772 $self->{quit_watch} = AnyEvent->signal(signal => "QUIT", cb => sub {
773 undef $self->{quit_watch};
774 $self->wait_end_loop();
775 });
776
777 return $self;
778}
779
780sub run {
781 my ($self) = @_;
782
783 $self->{end_cond}->recv;
784}
785
7861;