use PVE::INotify;
use PVE::Tools;
use PVE::APIServer::Formatter;
+use PVE::APIServer::Utils;
use Net::IP;
use URI;
}
}
+sub response_stream {
+ my ($self, $reqstate, $stream_fh) = @_;
+
+ # disable timeout, we don't know how big the data is
+ $reqstate->{hdl}->timeout(0);
+
+ my $buf_size = 4*1024*1024;
+
+ my $on_read;
+ $on_read = sub {
+ my ($hdl) = @_;
+ my $reqhdl = $reqstate->{hdl};
+ return if !$reqhdl;
+
+ my $wbuf_len = length($reqhdl->{wbuf});
+ my $rbuf_len = length($hdl->{rbuf});
+ # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+ # that's unbounded, so just assume $buf_size
+ my $to_read = $buf_size - $wbuf_len;
+ $to_read = $rbuf_len if $rbuf_len < $to_read;
+ if ($to_read > 0) {
+ my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+ $reqhdl->push_write($data);
+ $rbuf_len -= $to_read;
+ } elsif ($hdl->{_eof}) {
+ # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+ # any data when called at EOF, so unregister ourselves - data is
+ # flushed by on_eof anyway
+ # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+ $hdl->on_read();
+ return;
+ }
+
+ # apply backpressure so we don't accept any more data into
+ # buffer if the client isn't downloading fast enough
+ # note: read_size can double upon read, and we also need to
+ # account for one more read after start_read, so *4
+ if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+ # stop reading until write buffer is empty
+ $hdl->on_read();
+ my $prev_on_drain = $reqhdl->{on_drain};
+ $reqhdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # on_drain called because write buffer is empty, continue reading
+ $hdl->on_read($on_read);
+ if ($prev_on_drain) {
+ $wrhdl->on_drain($prev_on_drain);
+ $prev_on_drain->($wrhdl);
+ }
+ });
+ }
+ };
+
+ $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+ fh => $stream_fh,
+ rbuf_max => $buf_size,
+ timeout => 0,
+ on_read => $on_read,
+ on_eof => sub {
+ my ($hdl) = @_;
+ eval {
+ if (my $reqhdl = $reqstate->{hdl}) {
+ $self->log_aborted_request($reqstate);
+ # write out any remaining data
+ $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
+ $hdl->{rbuf} = "";
+ $reqhdl->push_shutdown();
+ $self->finish_response($reqstate);
+ }
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ on_error => sub {
+ my ($hdl, $fatal, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ );
+}
+
sub response {
- my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+ my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
#print "$$: send response: " . Dumper($resp);
$resp->header('Server' => "pve-api-daemon/3.0");
my $content_length;
- if ($content) {
+ if ($content && !$stream_fh) {
$content_length = length($content);
#print "SEND(without content) $res\n" if $self->{debug};
$res .= "\015\012";
- $res .= $content if $content;
+ $res .= $content if $content && !$stream_fh;
$self->log_request($reqstate, $reqstate->{request});
- if ($delay && $delay > 0) {
+ if ($stream_fh) {
+ # write headers and preamble...
+ $reqstate->{hdl}->push_write($res);
+ # ...then stream data via an AnyEvent::Handle
+ $self->response_stream($reqstate, $stream_fh);
+ } elsif ($delay && $delay > 0) {
my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
undef $w; # delete reference
$reqstate->{hdl}->push_write($res);
my $mime;
if (ref($download) eq 'HASH') {
- $fh = $download->{fh};
$mime = $download->{'content-type'};
+
+ if ($download->{path} && $download->{stream} &&
+ $reqstate->{request}->header('PVEDisableProxy'))
+ {
+ # avoid double stream from a file, let the proxy handle it
+ die "internal error: file proxy streaming only available for pvedaemon\n"
+ if !$self->{trusted_env};
+ my $header = HTTP::Headers->new(
+ pvestreamfile => $download->{path},
+ Content_Type => $mime,
+ );
+ # we need some data so Content-Length gets set correctly and
+ # the proxy doesn't wait for more data - place a canary
+ my $resp = HTTP::Response->new(200, "OK", $header, "error canary");
+ $self->response($reqstate, $resp);
+ return;
+ }
+
+ if (!($fh = $download->{fh})) {
+ my $path = $download->{path};
+ die "internal error: {download} returned but neither fh not path given\n"
+ if !$path;
+ sysopen($fh, "$path", O_NONBLOCK | O_RDONLY)
+ or die "open stream path '$path' for reading failed: $!\n";
+ }
+
+ if ($download->{stream}) {
+ my $header = HTTP::Headers->new(Content_Type => $mime);
+ my $resp = HTTP::Response->new(200, "OK", $header);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ return;
+ }
} else {
my $filename = $download;
$fh = IO::File->new($filename, '<') ||
eval {
my $code = delete $hdr->{Status};
my $msg = delete $hdr->{Reason};
+ my $stream = delete $hdr->{pvestreamfile};
delete $hdr->{URL};
delete $hdr->{HTTPVersion};
my $header = HTTP::Headers->new(%$hdr);
$location =~ s|^http://localhost:85||;
$header->header(Location => $location);
}
- my $resp = HTTP::Response->new($code, $msg, $header, $body);
- # Note: disable compression, because body is already compressed
- $self->response($reqstate, $resp, undef, 1);
+ if ($stream) {
+ sysopen(my $fh, "$stream", O_NONBLOCK | O_RDONLY)
+ or die "open stream path '$stream' for forwarding failed: $!\n";
+ my $resp = HTTP::Response->new($code, $msg, $header, undef);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ } else {
+ my $resp = HTTP::Response->new($code, $msg, $header, $body);
+ # Note: disable compression, because body is already compressed
+ $self->response($reqstate, $resp, undef, 1);
+ }
};
warn $@ if $@;
});
sub check_host_access {
my ($self, $clientip) = @_;
+ $clientip = PVE::APIServer::Utils::normalize_v4_in_v6($clientip);
my $cip = Net::IP->new($clientip);
if (!$cip) {