use PVE::INotify;
use PVE::Tools;
use PVE::APIServer::Formatter;
+use PVE::APIServer::Utils;
use Net::IP;
use URI;
use HTTP::Request;
use HTTP::Response;
use Data::Dumper;
+use JSON;
-my $limit_max_headers = 30;
+my $limit_max_headers = 64;
my $limit_max_header_size = 8*1024;
my $limit_max_post = 64*1024;
return wantarray ? ($rel_uri, $format) : $rel_uri;
};
+sub dprint {
+ my ($self, $message) = @_;
+
+ return if !$self->{debug};
+
+ my ($pkg, $pkgfile, $line, $sub) = caller(1);
+ $sub =~ s/^(?:.+::)+//;
+ print "worker[$$]: $pkg +$line: $sub: $message\n";
+}
+
sub log_request {
my ($self, $reqstate) = @_;
return;
}
- print "close connection $hdl\n" if $self->{debug};
+ $self->dprint("close connection $hdl");
&$shutdown_hdl($hdl);
+ warn "connection count <= 0!\n" if $self->{conn_count} <= 0;
+
$self->{conn_count}--;
- print "$$: CLOSE FH" . $hdl->{fh}->fileno() . " CONN$self->{conn_count}\n" if $self->{debug};
+ $self->dprint("CLOSE FH" . $hdl->{fh}->fileno() . " CONN$self->{conn_count}");
}
sub finish_response {
}
}
+sub response_stream {
+ my ($self, $reqstate, $stream_fh) = @_;
+
+ # disable timeout, we don't know how big the data is
+ $reqstate->{hdl}->timeout(0);
+
+ my $buf_size = 4*1024*1024;
+
+ my $on_read;
+ $on_read = sub {
+ my ($hdl) = @_;
+ my $reqhdl = $reqstate->{hdl};
+ return if !$reqhdl;
+
+ my $wbuf_len = length($reqhdl->{wbuf});
+ my $rbuf_len = length($hdl->{rbuf});
+ # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+ # that's unbounded, so just assume $buf_size
+ my $to_read = $buf_size - $wbuf_len;
+ $to_read = $rbuf_len if $rbuf_len < $to_read;
+ if ($to_read > 0) {
+ my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+ $reqhdl->push_write($data);
+ $rbuf_len -= $to_read;
+ } elsif ($hdl->{_eof}) {
+ # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+ # any data when called at EOF, so unregister ourselves - data is
+ # flushed by on_eof anyway
+ # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+ $hdl->on_read();
+ return;
+ }
+
+ # apply backpressure so we don't accept any more data into
+ # buffer if the client isn't downloading fast enough
+ # note: read_size can double upon read, and we also need to
+ # account for one more read after start_read, so *4
+ if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+ # stop reading until write buffer is empty
+ $hdl->on_read();
+ my $prev_on_drain = $reqhdl->{on_drain};
+ $reqhdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # on_drain called because write buffer is empty, continue reading
+ $hdl->on_read($on_read);
+ if ($prev_on_drain) {
+ $wrhdl->on_drain($prev_on_drain);
+ $prev_on_drain->($wrhdl);
+ }
+ });
+ }
+ };
+
+ $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+ fh => $stream_fh,
+ rbuf_max => $buf_size,
+ timeout => 0,
+ on_read => $on_read,
+ on_eof => sub {
+ my ($hdl) = @_;
+ eval {
+ if (my $reqhdl = $reqstate->{hdl}) {
+ $self->log_aborted_request($reqstate);
+ # write out any remaining data
+ $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
+ $hdl->{rbuf} = "";
+ $reqhdl->push_shutdown();
+ $self->finish_response($reqstate);
+ }
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ on_error => sub {
+ my ($hdl, $fatal, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ );
+}
+
sub response {
- my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+ my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
#print "$$: send response: " . Dumper($resp);
$resp->header('Server' => "pve-api-daemon/3.0");
my $content_length;
- if ($content) {
+ if ($content && !$stream_fh) {
$content_length = length($content);
#print "SEND(without content) $res\n" if $self->{debug};
$res .= "\015\012";
- $res .= $content if $content;
+ $res .= $content if $content && !$stream_fh;
$self->log_request($reqstate, $reqstate->{request});
- if ($delay && $delay > 0) {
+ if ($stream_fh) {
+ # write headers and preamble...
+ $reqstate->{hdl}->push_write($res);
+ # ...then stream data via an AnyEvent::Handle
+ $self->response_stream($reqstate, $stream_fh);
+ } elsif ($delay && $delay > 0) {
my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
undef $w; # delete reference
$reqstate->{hdl}->push_write($res);
my $mime;
if (ref($download) eq 'HASH') {
- $fh = $download->{fh};
$mime = $download->{'content-type'};
+
+ if ($download->{path} && $download->{stream} &&
+ $reqstate->{request}->header('PVEDisableProxy'))
+ {
+ # avoid double stream from a file, let the proxy handle it
+ die "internal error: file proxy streaming only available for pvedaemon\n"
+ if !$self->{trusted_env};
+ my $header = HTTP::Headers->new(
+ pvestreamfile => $download->{path},
+ Content_Type => $mime,
+ );
+ # we need some data so Content-Length gets set correctly and
+ # the proxy doesn't wait for more data - place a canary
+ my $resp = HTTP::Response->new(200, "OK", $header, "error canary");
+ $self->response($reqstate, $resp);
+ return;
+ }
+
+ if (!($fh = $download->{fh})) {
+ my $path = $download->{path};
+ die "internal error: {download} returned but neither fh not path given\n"
+ if !$path;
+ sysopen($fh, "$path", O_NONBLOCK | O_RDONLY)
+ or die "open stream path '$path' for reading failed: $!\n";
+ }
+
+ if ($download->{stream}) {
+ my $header = HTTP::Headers->new(Content_Type => $mime);
+ my $resp = HTTP::Response->new(200, "OK", $header);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ return;
+ }
} else {
my $filename = $download;
$fh = IO::File->new($filename, '<') ||
my ($fh) = @_
or die "connect to '$remhost:$remport' failed: $!";
- print "$$: CONNECTed to '$remhost:$remport'\n" if $self->{debug};
+ $self->dprint("CONNECTed to '$remhost:$remport'");
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
$reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
} elsif ($opcode == 8) {
my $statuscode = unpack ("n", $payload);
- print "websocket received close. status code: '$statuscode'\n" if $self->{debug};
- if ($reqstate->{proxyhdl}) {
- $reqstate->{proxyhdl}->push_shutdown();
- }
+ $self->dprint("websocket received close. status code: '$statuscode'");
+ if ($reqstate->{proxyhdl}) {
+ $reqstate->{proxyhdl}->push_shutdown();
+ }
$hdl->push_shutdown();
+ } elsif ($opcode == 9) {
+ # ping received, schedule pong
+ $reqstate->{hdl}->push_write($encode->(\$payload, "\x8A")) if $reqstate->{hdl};
+ } elsif ($opcode == 0xA) {
+ # pong received, continue
} else {
die "received unhandled websocket opcode $opcode\n";
}
"Sec-WebSocket-Protocol: $wsproto\015\012" .
"\015\012";
- print $res if $self->{debug};
+ $self->dprint($res);
$reqstate->{hdl}->push_write($res);
eval {
my $code = delete $hdr->{Status};
my $msg = delete $hdr->{Reason};
+ my $stream = delete $hdr->{pvestreamfile};
delete $hdr->{URL};
delete $hdr->{HTTPVersion};
my $header = HTTP::Headers->new(%$hdr);
$location =~ s|^http://localhost:85||;
$header->header(Location => $location);
}
- my $resp = HTTP::Response->new($code, $msg, $header, $body);
- # Note: disable compression, because body is already compressed
- $self->response($reqstate, $resp, undef, 1);
+ if ($stream) {
+ sysopen(my $fh, "$stream", O_NONBLOCK | O_RDONLY)
+ or die "open stream path '$stream' for forwarding failed: $!\n";
+ my $resp = HTTP::Response->new($code, $msg, $header, undef);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ } else {
+ my $resp = HTTP::Response->new($code, $msg, $header, $body);
+ # Note: disable compression, because body is already compressed
+ $self->response($reqstate, $resp, undef, 1);
+ }
};
warn $@ if $@;
});
my $params = {};
if ($method eq 'PUT' || $method eq 'POST') {
- $params = decode_urlencoded($r->content);
+ my $ct;
+ if (my $ctype = $r->header('Content-Type')) {
+ $ct = parse_content_type($ctype);
+ }
+ if (defined($ct) && $ct eq 'application/json') {
+ $params = decode_json($r->content);
+ } else {
+ $params = decode_urlencoded($r->content);
+ }
}
my $query_params = decode_urlencoded($r->url->query());
$delay = 0 if $delay < 0;
}
- if (defined(my $download = $res->{download})) {
+ my $download = $res->{download};
+ $download //= $res->{data}->{download}
+ if defined($res->{data}) && ref($res->{data}) eq 'HASH';
+ if (defined($download)) {
send_file_start($self, $reqstate, $download);
return;
}
if ($node ne 'localhost' && PVE::INotify::nodename() !~ m/^$node$/i) {
$remip = $self->remote_node_ip($node);
- print "REMOTE CONNECT $vmid, $remip, $connect_str\n" if $self->{debug};
+ $self->dprint("REMOTE CONNECT $vmid, $remip, $connect_str");
} else {
- print "$$: CONNECT $vmid, $node, $spiceport\n" if $self->{debug};
+ $self->dprint("CONNECT $vmid, $node, $spiceport");
}
if ($remip && $r->header('PVEDisableProxy')) {
my ($fh) = @_
or die "connect to '$remhost:$remport' failed: $!";
- print "$$: CONNECTed to '$remhost:$remport'\n" if $self->{debug};
+ $self->dprint("CONNECTed to '$remhost:$remport'");
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
rbuf_max => 64*1024,
eval {
# print "$$: got header: $line\n" if $self->{debug};
- die "to many http header lines\n" if ++$state->{count} >= $limit_max_headers;
+ die "too many http header lines (> $limit_max_headers)\n" if ++$state->{count} >= $limit_max_headers;
die "http header too large\n" if ($state->{size} += length($line)) >= $limit_max_header_size;
my $r = $reqstate->{request};
}
my $ctype = $r->header('Content-Type');
- my ($ct, $boundary) = parse_content_type($ctype) if $ctype;
+ my ($ct, $boundary);
+ ($ct, $boundary)= parse_content_type($ctype) if $ctype;
if ($auth->{isUpload} && !$self->{trusted_env}) {
die "upload 'Content-Type '$ctype' not implemented\n"
die "upload without content length header not supported" if !$len;
- print "start upload $path $ct $boundary\n" if $self->{debug};
+ $self->dprint("start upload $path $ct $boundary");
my $tmpfilename = get_upload_filename();
my $outfh = IO::File->new($tmpfilename, O_RDWR|O_CREAT|O_EXCL, 0600) ||
return;
}
- if (!$ct || $ct eq 'application/x-www-form-urlencoded') {
+ if (!$ct || $ct eq 'application/x-www-form-urlencoded' || $ct eq 'application/json') {
$reqstate->{hdl}->unshift_read(chunk => $len, sub {
my ($hdl, $data) = @_;
$r->content($data);
fh_nonblocking $clientfh, 1;
- $self->{conn_count}++;
-
return $clientfh;
}
sub check_host_access {
my ($self, $clientip) = @_;
+ $clientip = PVE::APIServer::Utils::normalize_v4_in_v6($clientip);
my $cip = Net::IP->new($clientip);
+ if (!$cip) {
+ $self->dprint("client IP not parsable: $@");
+ return 0;
+ }
+
my $match_allow = 0;
my $match_deny = 0;
foreach my $t (@{$self->{allow_from}}) {
if ($t->overlaps($cip)) {
$match_allow = 1;
+ $self->dprint("client IP allowed: ". $t->prefix());
last;
}
}
if ($self->{deny_from}) {
foreach my $t (@{$self->{deny_from}}) {
if ($t->overlaps($cip)) {
+ $self->dprint("client IP denied: ". $t->prefix());
$match_deny = 1;
last;
}
sub accept_connections {
my ($self) = @_;
+ my ($clientfh, $handle_creation);
eval {
- while (my $clientfh = $self->accept()) {
+ while ($clientfh = $self->accept()) {
my $reqstate = { keep_alive => $self->{keep_alive} };
# stop keep-alive when there are many open connections
- if ($self->{conn_count} >= $self->{max_conn_soft_limit}) {
+ if ($self->{conn_count} + 1 >= $self->{max_conn_soft_limit}) {
$reqstate->{keep_alive} = 0;
}
if (my $sin = getpeername($clientfh)) {
my ($pfamily, $pport, $phost) = PVE::Tools::unpack_sockaddr_in46($sin);
($reqstate->{peer_port}, $reqstate->{peer_host}) = ($pport, Socket::inet_ntop($pfamily, $phost));
+ } else {
+ $self->dprint("getpeername failed: $!");
+ close($clientfh);
+ next;
}
if (!$self->{trusted_env} && !$self->check_host_access($reqstate->{peer_host})) {
- print "$$: ABORT request from $reqstate->{peer_host} - access denied\n" if $self->{debug};
+ $self->dprint("ABORT request from $reqstate->{peer_host} - access denied");
$reqstate->{log}->{code} = 403;
$self->log_request($reqstate);
+ close($clientfh);
next;
}
+ # Increment conn_count before creating new handle, since creation
+ # triggers callbacks, which can potentialy decrement (e.g.
+ # on_error) conn_count before AnyEvent::Handle->new() returns.
+ $handle_creation = 1;
+ $self->{conn_count}++;
$reqstate->{hdl} = AnyEvent::Handle->new(
fh => $clientfh,
rbuf_max => 64*1024,
if (my $err = $@) { syslog('err', "$err"); }
},
($self->{tls_ctx} ? (tls => "accept", tls_ctx => $self->{tls_ctx}) : ()));
+ $handle_creation = 0;
- print "$$: ACCEPT FH" . $clientfh->fileno() . " CONN$self->{conn_count}\n" if $self->{debug};
+ $self->dprint("ACCEPT FH" . $clientfh->fileno() . " CONN$self->{conn_count}");
$self->push_request_header($reqstate);
}
if (my $err = $@) {
syslog('err', $err);
+ $self->dprint("connection accept error: $err");
+ close($clientfh);
+ if ($handle_creation) {
+ if ($self->{conn_count} <= 0) {
+ warn "connection count <= 0 not decrementing!\n";
+ } else {
+ $self->{conn_count}--;
+ }
+ }
$self->{end_loop} = 1;
}