]> git.proxmox.com Git - pve-manager.git/blob - PVE/HTTPServer.pm
code cleanups
[pve-manager.git] / PVE / HTTPServer.pm
1 package PVE::HTTPServer;
2
3 use strict;
4 use warnings;
5 use Socket qw(IPPROTO_TCP TCP_NODELAY SOMAXCONN);
6 use POSIX qw(strftime EINTR EAGAIN);
7 use Fcntl;
8 use File::stat qw();
9 use AnyEvent::Strict;
10 use AnyEvent::Util qw(guard fh_nonblocking WSAEWOULDBLOCK WSAEINPROGRESS);
11 use AnyEvent::Handle;
12 use AnyEvent::TLS;
13 use AnyEvent::IO;
14 use AnyEvent::HTTP;
15 use Fcntl ();
16 use Compress::Zlib;
17 use PVE::SafeSyslog;
18 use PVE::INotify;
19 use PVE::RPCEnvironment;
20 use PVE::REST;
21
22 use URI;
23 use HTTP::Status qw(:constants);
24 use HTTP::Headers;
25 use HTTP::Response;
26
27 use CGI; # fixme: remove this!
28 # DOS attack prevention
29 # fixme: remove CGI.pm
30 $CGI::DISABLE_UPLOADS = 1; # no uploads
31 $CGI::POST_MAX = 1024 * 10; # max 10K posts
32
33 use Data::Dumper; # fixme: remove
34
35 my $known_methods = {
36 GET => 1,
37 POST => 1,
38 PUT => 1,
39 DELETE => 1,
40 };
41
42 sub log_request {
43 my ($self, $reqstate) = @_;
44
45 my $loginfo = $reqstate->{log};
46
47 # like apache2 common log format
48 # LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\""
49
50 my $peerip = $reqstate->{peer_host} || '-';
51 my $userid = $loginfo->{userid} || '-';
52 my $content_length = defined($loginfo->{content_length}) ? $loginfo->{content_length} : '-';
53 my $code = $loginfo->{code} || 500;
54 my $requestline = $loginfo->{requestline} || '-';
55 my $timestr = strftime("%d/%b/%Y:%H:%M:%S %z", localtime());
56
57 my $msg = "$peerip - $userid [$timestr] \"$requestline\" $code $content_length\n";
58
59 $self->write_log($msg);
60 }
61
62 sub log_aborted_request {
63 my ($self, $reqstate, $error) = @_;
64
65 my $r = $reqstate->{request};
66 return if !$r; # no active request
67
68 if ($error) {
69 syslog("err", "problem with client $reqstate->{peer_host}; $error");
70 }
71
72 $self->log_request($reqstate);
73 }
74
75 sub client_do_disconnect {
76 my ($self, $reqstate) = @_;
77
78 my $hdl = delete $reqstate->{hdl};
79
80 if (!$hdl) {
81 syslog('err', "detected empty handle");
82 return;
83 }
84
85 #print "close connection $hdl\n";
86
87 shutdown($hdl->{fh}, 1);
88 # clear all handlers
89 $hdl->on_drain(undef);
90 $hdl->on_read(undef);
91 $hdl->on_eof(undef);
92 $self->{conn_count}--;
93
94 print "$$: CLOSE FH" . $hdl->{fh}->fileno() . " CONN$self->{conn_count}\n" if $self->{debug};
95 }
96
97 sub finish_response {
98 my ($self, $reqstate) = @_;
99
100 my $hdl = $reqstate->{hdl};
101
102 delete $reqstate->{log};
103 delete $reqstate->{request};
104 delete $reqstate->{proto};
105
106 if (!$self->{end_loop} && $reqstate->{keep_alive} > 0) {
107 # print "KEEPALIVE $reqstate->{keep_alive}\n";
108 $hdl->on_read(sub {
109 eval { $self->push_request_header($reqstate); };
110 warn $@ if $@;
111 });
112 } else {
113 $hdl->on_drain (sub {
114 eval {
115 $self->client_do_disconnect($reqstate);
116 };
117 warn $@ if $@;
118 });
119 }
120 }
121
122 sub response {
123 my ($self, $reqstate, $resp, $mtime, $nocomp) = @_;
124
125 #print "$$: send response: " . Dumper($resp);
126
127 my $code = $resp->code;
128 my $msg = $resp->message || HTTP::Status::status_message($code);
129 ($msg) = $msg =~m/^(.*)$/m;
130 my $content = $resp->content;
131
132 if ($code =~ /^(1\d\d|[23]04)$/) {
133 # make sure content we have no content
134 $content = "";
135 }
136
137 $reqstate->{keep_alive} = 0 if ($code >= 300) || $self->{end_loop};
138
139 $reqstate->{log}->{code} = $code;
140
141 my $res = "HTTP/1.0 $code $msg\015\012";
142
143 my $ctime = time();
144 my $date = HTTP::Date::time2str($ctime);
145 $resp->header('Date' => $date);
146 if ($mtime) {
147 $resp->header('Last-Modified' => HTTP::Date::time2str($mtime));
148 } else {
149 $resp->header('Expires' => $date);
150 $resp->header('Cache-Control' => "max-age=0");
151 $resp->header("Pragma", "no-cache");
152 }
153
154 $resp->header('Server' => "pve-api-daemon/3.0");
155
156 my $content_length;
157 if ($content) {
158
159 $content_length = length($content);
160
161 if (!$nocomp && ($content_length > 1024)) {
162 my $comp = Compress::Zlib::memGzip($content);
163 $resp->header('Content-Encoding', 'gzip');
164 $content = $comp;
165 $content_length = length($content);
166 }
167 $resp->header("Content-Length" => $content_length);
168 $reqstate->{log}->{content_length} = $content_length;
169
170 } else {
171 $resp->remove_header("Content-Length");
172 }
173
174 if ($reqstate->{keep_alive} > 0) {
175 $resp->push_header('Connection' => 'Keep-Alive');
176 } else {
177 $resp->header('Connection' => 'close');
178 }
179
180 $res .= $resp->headers_as_string("\015\012");
181 #print "SEND(without content) $res\n" if $self->{debug};
182
183 $res .= "\015\012";
184 $res .= $content;
185
186 $self->log_request($reqstate, $reqstate->{request});
187
188 $reqstate->{hdl}->push_write($res);
189 $self->finish_response($reqstate);
190 }
191
192 sub error {
193 my ($self, $reqstate, $code, $msg, $hdr, $content) = @_;
194
195 eval {
196 my $resp = HTTP::Response->new($code, $msg, $hdr, $content);
197 $self->response($reqstate, $resp);
198 };
199 warn $@ if $@;
200 }
201
202 sub send_file_start {
203 my ($self, $reqstate, $filename) = @_;
204
205 eval {
206 # print "SEND FILE $filename\n";
207 # Note: aio_load() this is not really async unless we use IO::AIO!
208 eval {
209
210 my $fh = IO::File->new($filename, '<') ||
211 die "$!\n";
212 my $stat = File::stat::stat($fh) ||
213 die "$!\n";
214
215 my $data;
216 my $len = sysread($fh, $data, $stat->size);
217 die "got short file\n" if !defined($len) || $len != $stat->size;
218
219 my $ct;
220 if ($filename =~ m/\.css$/) {
221 $ct = 'text/css';
222 } elsif ($filename =~ m/\.js$/) {
223 $ct = 'application/javascript';
224 } elsif ($filename =~ m/\.png$/) {
225 $ct = 'image/png';
226 } elsif ($filename =~ m/\.gif$/) {
227 $ct = 'image/gif';
228 } elsif ($filename =~ m/\.jar$/) {
229 $ct = 'application/java-archive';
230 } else {
231 die "unable to detect content type";
232 }
233
234 my $header = HTTP::Headers->new(Content_Type => $ct);
235 my $resp = HTTP::Response->new(200, "OK", $header, $data);
236 $self->response($reqstate, $resp, $stat->mtime);
237 };
238 if (my $err = $@) {
239 $self->error($reqstate, 501, $err);
240 }
241 };
242
243 warn $@ if $@;
244 }
245
246 sub proxy_request {
247 my ($self, $reqstate, $r, $clientip, $host, $method, $abs_uri, $ticket, $token, $params) = @_;
248
249 eval {
250 my $target;
251 if ($host eq 'localhost') {
252 $target = "http://$host:85$abs_uri";
253 } else {
254 $target = "https://$host:8006$abs_uri";
255 }
256
257 my $headers = {
258 PVEDisableProxy => 'true',
259 PVEClientIP => $clientip,
260 };
261
262 my $cookie_name = 'PVEAuthCookie';
263
264 $headers->{'cookie'} = PVE::REST::create_auth_cookie($ticket) if $ticket;
265 $headers->{'CSRFPreventionToken'} = $token if $token;
266
267 my $content;
268
269 if ($method eq 'POST' || $method eq 'PUT') {
270 $headers->{'Content-Type'} = 'application/x-www-form-urlencoded';
271 # We use a temporary URI object to format
272 # the application/x-www-form-urlencoded content.
273 my $url = URI->new('http:');
274 $url->query_form(%$params);
275 $content = $url->query;
276 if (defined($content)) {
277 $headers->{'Content-Length'} = length($content);
278 }
279 }
280
281 my $w; $w = http_request(
282 $method => $target,
283 headers => $headers,
284 timeout => 30,
285 resurse => 0,
286 body => $content,
287 sub {
288 my ($body, $hdr) = @_;
289
290 undef $w;
291
292 eval {
293 my $code = delete $hdr->{Status};
294 my $msg = delete $hdr->{Reason};
295 delete $hdr->{URL};
296 delete $hdr->{HTTPVersion};
297 my $header = HTTP::Headers->new(%$hdr);
298 my $resp = HTTP::Response->new($code, $msg, $header, $body);
299 $self->response($reqstate, $resp, undef, 1);
300 };
301 warn $@ if $@;
302 });
303 };
304 warn $@ if $@;
305 }
306
307 my $extract_params = sub {
308 my ($r, $method) = @_;
309
310 # NOTE: HTTP::Request::Params return undef instead of ''
311 #my $parser = HTTP::Request::Params->new({req => $r});
312 #my $params = $parser->params;
313
314 my $post_params = {};
315
316 if ($method eq 'PUT' || $method eq 'POST') {
317 $post_params = CGI->new($r->content())->Vars;
318 }
319
320 my $query_params = CGI->new($r->url->query)->Vars;
321 my $params = $post_params || {};
322
323 foreach my $k (keys %{$query_params}) {
324 $params->{$k} = $query_params->{$k};
325 }
326
327 return PVE::Tools::decode_utf8_parameters($params);
328 };
329
330 sub handle_api2_request {
331 my ($self, $reqstate) = @_;
332
333 eval {
334 my $r = $reqstate->{request};
335 my $method = $r->method();
336 my $path = $r->uri->path();
337
338 my ($rel_uri, $format) = PVE::REST::split_abs_uri($path);
339 if (!$format) {
340 $self->error($reqstate, HTTP_NOT_IMPLEMENTED, "no such uri");
341 return;
342 }
343
344 my $rpcenv = $self->{rpcenv};
345 my $headers = $r->headers;
346
347 my $token = $headers->header('CSRFPreventionToken');
348
349 my $cookie = $headers->header('Cookie');
350
351 my $ticket = PVE::REST::extract_auth_cookie($cookie);
352
353 my $params = &$extract_params($r, $method);
354
355 my $clientip = $headers->header('PVEClientIP');
356
357 $rpcenv->init_request(params => $params);
358
359 my $res = PVE::REST::rest_handler($rpcenv, $clientip, $method, $path, $rel_uri, $ticket, $token);
360
361 # todo: eval { $userid = $rpcenv->get_user(); };
362 my $userid = $rpcenv->{user}; # this is faster
363 $rpcenv->set_user(undef); # clear after request
364
365 $reqstate->{log}->{userid} = $userid;
366
367 if ($res->{proxy}) {
368
369 if ($self->{trusted_env}) {
370 $self->error($reqstate, HTTP_INTERNAL_SERVER_ERROR, "proxy not allowed");
371 return;
372 }
373
374 $self->proxy_request($reqstate, $r, $clientip, $res->{proxy}, $method,
375 $r->uri, $ticket, $token, $res->{proxy_params});
376 return;
377
378 }
379
380 PVE::REST::prepare_response_data($format, $res);
381 my ($raw, $ct) = PVE::REST::format_response_data($format, $res, $path);
382
383 my $resp = HTTP::Response->new($res->{status}, $res->{message});
384 $resp->header("Content-Type" => $ct);
385 $resp->content($raw);
386 $self->response($reqstate, $resp);
387
388 return;
389 };
390 warn $@ if $@;
391 }
392
393 sub handle_request {
394 my ($self, $reqstate) = @_;
395
396 #print "REQUEST" . Dumper($reqstate->{request});
397
398 eval {
399 my $r = $reqstate->{request};
400 my $method = $r->method();
401 my $path = $r->uri->path();
402
403 # print "REQUEST $path\n";
404
405 if (!$known_methods->{$method}) {
406 my $resp = HTTP::Response->new(HTTP_NOT_IMPLEMENTED, "method '$method' not available");
407 $self->response($reqstate, $resp);
408 return;
409 }
410
411 if ($path =~ m!/api2!) {
412 $self->handle_api2_request($reqstate);
413 return;
414 }
415
416 if ($self->{pages} && ($method eq 'GET') && (my $handler = $self->{pages}->{$path})) {
417 if (ref($handler) eq 'CODE') {
418 my ($resp, $userid) = &$handler($self, $reqstate->{request});
419 $self->response($reqstate, $resp);
420 } elsif (ref($handler) eq 'HASH') {
421 if (my $filename = $handler->{file}) {
422 my $fh = IO::File->new($filename) ||
423 die "unable to open file '$filename' - $!\n";
424 send_file_start($self, $reqstate, $filename);
425 } else {
426 die "internal error - no handler";
427 }
428 } else {
429 die "internal error - no handler";
430 }
431 return;
432 }
433
434 if ($self->{dirs} && ($method eq 'GET')) {
435 # we only allow simple names
436 if ($path =~ m!^(/\S+/)([a-zA-Z0-9\-\_\.]+)$!) {
437 my ($subdir, $file) = ($1, $2);
438 if (my $dir = $self->{dirs}->{$subdir}) {
439 my $filename = "$dir$file";
440 my $fh = IO::File->new($filename) ||
441 die "unable to open file '$filename' - $!\n";
442 send_file_start($self, $reqstate, $filename);
443 return;
444 }
445 }
446 }
447
448 die "no such file '$path'";
449 };
450 if (my $err = $@) {
451 $self->error($reqstate, 501, $err);
452 }
453 }
454
455 sub unshift_read_header {
456 my ($self, $reqstate, $state) = @_;
457
458 $state = {} if !$state;
459
460 $reqstate->{hdl}->unshift_read(line => sub {
461 my ($hdl, $line) = @_;
462
463 eval {
464 #print "$$: got header: $line\n";
465
466 my $r = $reqstate->{request};
467 if ($line eq '') {
468
469 $r->push_header($state->{key}, $state->{val})
470 if $state->{key};
471
472 my $conn = $r->header('Connection');
473
474 if ($conn) {
475 $reqstate->{keep_alive} = 0 if $conn =~ m/close/oi;
476 } else {
477 if ($reqstate->{proto}->{ver} < 1001) {
478 $reqstate->{keep_alive} = 0;
479 }
480 }
481
482 # how much content to read?
483 my $te = $r->header('Transfer-Encoding');
484 my $len = $r->header('Content-Length');
485 my $pveclientip = $r->header('PVEClientIP');
486
487 # fixme: how can we make PVEClientIP header trusted?
488 if ($self->{trusted_env} && $pveclientip) {
489 $reqstate->{peer_host} = $pveclientip;
490 } else {
491 $r->header('PVEClientIP', $reqstate->{peer_host});
492 }
493
494 if ($te && lc($te) eq 'chunked') {
495 # Handle chunked transfer encoding
496 $self->error($reqstate, 501, "chunked transfer encoding not supported");
497 } elsif ($te) {
498 $self->error($reqstate, 501, "Unknown transfer encoding '$te'");
499 } elsif (defined($len)) {
500 $reqstate->{hdl}->unshift_read (chunk => $len, sub {
501 my ($hdl, $data) = @_;
502 $r->content($data);
503 $self->handle_request($reqstate);
504 });
505 } else {
506 $self->handle_request($reqstate);
507 }
508 } elsif ($line =~ /^([^:\s]+)\s*:\s*(.*)/) {
509 $r->push_header($state->{key}, $state->{val}) if $state->{key};
510 ($state->{key}, $state->{val}) = ($1, $2);
511 $self->unshift_read_header($reqstate, $state);
512 } elsif ($line =~ /^\s+(.*)/) {
513 $state->{val} .= " $1";
514 $self->unshift_read_header($reqstate, $state);
515 } else {
516 $self->error($reqstate, 506, "unable to parse request header");
517 }
518 };
519 warn $@ if $@;
520 });
521 };
522
523 sub push_request_header {
524 my ($self, $reqstate) = @_;
525
526 eval {
527 $reqstate->{hdl}->push_read(line => sub {
528 my ($hdl, $line) = @_;
529
530 eval {
531 #print "got request header: $line\n";
532
533 $reqstate->{keep_alive}--;
534
535 if ($line =~ /(\S+)\040(\S+)\040HTTP\/(\d+)\.(\d+)/o) {
536 my ($method, $uri, $maj, $min) = ($1, $2, $3, $4);
537
538 if ($maj != 1) {
539 $self->error($reqstate, 506, "http protocol version $maj.$min not supported");
540 return;
541 }
542
543 $self->{request_count}++; # only count valid request headers
544 if ($self->{request_count} >= $self->{max_requests}) {
545 $self->{end_loop} = 1;
546 }
547 $reqstate->{log} = { requestline => $line };
548 $reqstate->{proto}->{maj} = $maj;
549 $reqstate->{proto}->{min} = $min;
550 $reqstate->{proto}->{ver} = $maj*1000+$min;
551 $reqstate->{request} = HTTP::Request->new($method, $uri);
552
553 $self->unshift_read_header($reqstate);
554 } elsif ($line eq '') {
555 # ignore empty lines before requests (browser bugs?)
556 $self->push_request_header($reqstate);
557 } else {
558 $self->error($reqstate, 400, 'bad request');
559 }
560 };
561 warn $@ if $@;
562 });
563 };
564 warn $@ if $@;
565 }
566
567 sub accept {
568 my ($self) = @_;
569
570 my $clientfh;
571
572 return if $self->{end_loop};
573
574 # we need to m make sure that only one process calls accept
575 while (!flock($self->{lockfh}, Fcntl::LOCK_EX())) {
576 next if $! == EINTR;
577 die "could not get lock on file '$self->{lockfile}' - $!\n";
578 }
579
580 my $again = 0;
581 my $errmsg;
582 eval {
583 while (!$self->{end_loop} &&
584 !defined($clientfh = $self->{socket}->accept()) &&
585 ($! == EINTR)) {};
586
587 if ($self->{end_loop}) {
588 $again = 0;
589 } else {
590 $again = ($! == EAGAIN || $! == WSAEWOULDBLOCK);
591 if (!defined($clientfh)) {
592 $errmsg = "failed to accept connection: $!\n";
593 }
594 }
595 };
596 warn $@ if $@;
597
598 flock($self->{lockfh}, Fcntl::LOCK_UN());
599
600 if (!defined($clientfh)) {
601 return if $again;
602 die $errmsg if $errmsg;
603 }
604
605 fh_nonblocking $clientfh, 1;
606
607 $self->{conn_count}++;
608
609 return $clientfh;
610 }
611
612 sub wait_end_loop {
613 my ($self) = @_;
614
615 $self->{end_loop} = 1;
616
617 undef $self->{socket_watch};
618
619 if ($self->{conn_count} <= 0) {
620 $self->{end_cond}->send(1);
621 return;
622 }
623
624 # else we need to wait until all open connections gets closed
625 my $w; $w = AnyEvent->timer (after => 1, interval => 1, cb => sub {
626 eval {
627 # todo: test for active connections instead (we can abort idle connections)
628 if ($self->{conn_count} <= 0) {
629 undef $w;
630 $self->{end_cond}->send(1);
631 }
632 };
633 warn $@ if $@;
634 });
635 }
636
637 sub accept_connections {
638 my ($self) = @_;
639
640 eval {
641
642 while (my $clientfh = $self->accept()) {
643
644 my $reqstate = { keep_alive => $self->{keep_alive} };
645
646 if (my $sin = getpeername($clientfh)) {
647 my ($pport, $phost) = Socket::unpack_sockaddr_in($sin);
648 ($reqstate->{peer_port}, $reqstate->{peer_host}) = ($pport, Socket::inet_ntoa($phost));
649 }
650
651 $reqstate->{hdl} = AnyEvent::Handle->new(
652 fh => $clientfh,
653 rbuf_max => 32768, # fixme: set smaller max read buffer ?
654 timeout => $self->{timeout},
655 linger => 0, # avoid problems with ssh - really needed ?
656 on_eof => sub {
657 my ($hdl) = @_;
658 eval {
659 $self->log_aborted_request($reqstate);
660 $self->client_do_disconnect($reqstate);
661 };
662 if (my $err = $@) { syslog('err', $err); }
663 },
664 on_error => sub {
665 my ($hdl, $fatal, $message) = @_;
666 eval {
667 $self->log_aborted_request($reqstate, $message);
668 $self->client_do_disconnect($reqstate);
669 };
670 if (my $err = $@) { syslog('err', "$err"); }
671 },
672 ($self->{tls_ctx} ? (tls => "accept", tls_ctx => $self->{tls_ctx}) : ()));
673
674 print "$$: ACCEPT FH" . $clientfh->fileno() . " CONN$self->{conn_count}\n" if $self->{debug};
675
676 $self->push_request_header($reqstate);
677 }
678 };
679
680 if (my $err = $@) {
681 syslog('err', $err);
682 $self->{end_loop} = 1;
683 }
684
685 $self->wait_end_loop() if $self->{end_loop};
686 }
687
688 # Note: We can't open log file in non-blocking mode and use AnyEvent::Handle,
689 # because we write from multiple processes, and that would arbitrarily mix output
690 # of all processes.
691 sub open_access_log {
692 my ($self, $filename) = @_;
693
694 my $old_mask = umask(0137);;
695 my $logfh = IO::File->new($filename, ">>") ||
696 die "unable to open log file '$filename' - $!\n";
697 umask($old_mask);
698
699 $logfh->autoflush(1);
700
701 $self->{logfh} = $logfh;
702 }
703
704 sub write_log {
705 my ($self, $data) = @_;
706
707 return if !defined($self->{logfh}) || !$data;
708
709 my $res = $self->{logfh}->print($data);
710
711 if (!$res) {
712 delete $self->{logfh};
713 syslog('err', "error writing access log");
714 $self->{end_loop} = 1; # terminate asap
715 }
716 }
717
718 sub new {
719 my ($this, %args) = @_;
720
721 my $class = ref($this) || $this;
722
723 foreach my $req (qw(socket lockfh lockfile)) {
724 die "misssing required argument '$req'" if !defined($args{$req});
725 }
726
727 my $self = bless { %args }, $class;
728
729 # init inotify
730 PVE::INotify::inotify_init();
731
732 my $atfork = sub { close($self->{socket}); };
733 $self->{rpcenv} = PVE::RPCEnvironment->init(
734 $self->{trusted_env} ? 'priv' : 'pub', atfork => $atfork);
735
736 fh_nonblocking($self->{socket}, 1);
737
738 $self->{end_loop} = 0;
739 $self->{conn_count} = 0;
740 $self->{request_count} = 0;
741 $self->{timeout} = 5 if !$self->{timeout};
742 $self->{keep_alive} = 0 if !defined($self->{keep_alive});
743 $self->{max_conn} = 800 if !$self->{max_conn};
744 $self->{max_requests} = 8000 if !$self->{max_requests};
745
746 $self->{end_cond} = AnyEvent->condvar;
747
748 if ($self->{ssl}) {
749 $self->{tls_ctx} = AnyEvent::TLS->new(%{$self->{ssl}});
750 }
751
752 $self->open_access_log($self->{logfile}) if $self->{logfile};
753
754 $self->{socket_watch} = AnyEvent->io(fh => $self->{socket}, poll => 'r', cb => sub {
755 eval {
756 if ($self->{conn_count} >= $self->{max_conn}) {
757 my $w; $w = AnyEvent->timer (after => 1, interval => 1, cb => sub {
758 if ($self->{conn_count} < $self->{max_conn}) {
759 undef $w;
760 $self->accept_connections();
761 }
762 });
763 } else {
764 $self->accept_connections();
765 }
766 };
767 warn $@ if $@;
768 });
769
770 $self->{term_watch} = AnyEvent->signal(signal => "TERM", cb => sub {
771 undef $self->{term_watch};
772 $self->wait_end_loop();
773 });
774
775 $self->{quit_watch} = AnyEvent->signal(signal => "QUIT", cb => sub {
776 undef $self->{quit_watch};
777 $self->wait_end_loop();
778 });
779
780 $self->{inotify_poll} = AnyEvent->timer(after => 5, interval => 5, cb => sub {
781 PVE::INotify::poll(); # read inotify events
782 });
783
784 return $self;
785 }
786
787 sub run {
788 my ($self) = @_;
789
790 $self->{end_cond}->recv;
791 }
792
793 1;