use Carp;
use JSON;
use Socket;
-
+use PVE::Tools;
+use PVE::INotify;
+use PVE::RPCEnvironment;
+
require Exporter;
+my $rados_default_timeout = 5;
+my $ceph_default_conf = '/etc/ceph/ceph.conf';
+my $ceph_default_user = 'admin';
+
+
our @ISA = qw(Exporter);
# Items to export into callers namespace by default. Note: do not export
require XSLoader;
XSLoader::load('PVE::RADOS', $VERSION);
-# fixme: timeouts??
-
my $writedata = sub {
my ($fh, $cmd, $data) = @_;
local $SIG{PIPE} = 'IGNORE';
-
+
my $bin = pack "a L/a*", $cmd, $data || '';
my $res = syswrite $fh, $bin;
};
my $readdata = sub {
- my ($fh, $expect_result) = @_;
+ my ($fh, $allow_eof) = @_;
my $head = '';
while (length($head) < 5) {
last if !sysread $fh, $head, 5 - length($head), length($head);
}
+ return undef if $allow_eof && length($head) == 0;
+
die "partial read\n" if length($head) < 5;
-
+
my ($cmd, $len) = unpack "a L", $head;
my $data = '';
}
die "partial data read\n" if length($data) < $len;
- if ($expect_result) {
- die $data if $cmd eq 'E' && $data;
- die "got unexpected result\n" if $cmd ne '>';
+ return wantarray ? ($cmd, $data) : $data;
+};
+
+my $kill_worker = sub {
+ my ($self) = @_;
+
+ return if !$self->{cpid};
+ return if $self->{__already_killed};
+
+ $self->{__already_killed} = 1;
+
+ close($self->{child}) if defined($self->{child});
+
+ # only kill if we created the process
+ return if $self->{pid} != $$;
+
+ kill(9, $self->{cpid});
+ waitpid($self->{cpid}, 0);
+};
+
+my $sendcmd = sub {
+ my ($self, $cmd, $data, $expect_tag) = @_;
+
+ $expect_tag = '>' if !$expect_tag;
+
+ die "detected forked connection" if $self->{pid} != $$;
+
+ my ($restag, $raw);
+ my $code = sub {
+ &$writedata($self->{child}, $cmd, $data) if $expect_tag ne 'S';
+ ($restag, $raw) = &$readdata($self->{child});
+ };
+ eval { PVE::Tools::run_with_timeout($self->{timeout}, $code); };
+ if (my $err = $@) {
+ &$kill_worker($self);
+ die $err;
+ }
+ if ($restag eq 'E') {
+ die $raw if $raw;
+ die "unknown error\n";
}
- return wantarray ? ($cmd, $data) : $data;
+ die "got unexpected result\n" if $restag ne $expect_tag;
+
+ return $raw;
};
sub new {
my ($class, %params) = @_;
+ my $rpcenv = PVE::RPCEnvironment::get();
+
socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
|| die "socketpair: $!";
my $self = bless {};
+ my $timeout = delete $params{timeout} || $rados_default_timeout;
+
+ $self->{timeout} = $timeout;
+ $self->{pid} = $$;
+
if ($cpid) { # parent
close $parent;
-
+
$self->{cpid} = $cpid;
$self->{child} = $child;
- # wait for sync
- my ($cmd, $msg) = &$readdata($child);
- die $msg if $cmd eq 'E';
- die "internal error- got unexpected result" if $cmd ne 'S';
+ &$sendcmd($self, undef, undef, 'S'); # wait for sync
} else { # child
$0 = 'pverados';
-
+
+ PVE::INotify::inotify_close();
+
+ if (my $atfork = $rpcenv->{atfork}) {
+ &$atfork();
+ }
+
# fixme: timeout?
close $child;
- my $timeout = delete $params{timeout} || 5;
-
my $conn;
eval {
- $conn = pve_rados_create() ||
+ my $ceph_user = delete $params{userid} || $ceph_default_user;
+ $conn = pve_rados_create($ceph_user) ||
die "unable to create RADOS object\n";
+ if (defined($params{ceph_conf}) && (!-e $params{ceph_conf})) {
+ die "Supplied ceph config doesn't exist, $params{ceph_conf}";
+ }
+
+ my $ceph_conf = delete $params{ceph_conf} || $ceph_default_conf;
+
+ if (-e $ceph_conf) {
+ pve_rados_conf_read_file($conn, $ceph_conf);
+ }
+
pve_rados_conf_set($conn, 'client_mount_timeout', $timeout);
foreach my $k (keys %params) {
$self->{conn} = $conn;
for (;;) {
- my ($cmd, $data) = &$readdata($parent);
-
- last if $cmd eq 'Q';
+ my ($cmd, $data) = &$readdata($parent, 1);
+
+ last if !$cmd || $cmd eq 'Q';
my $res;
eval {
}
&$writedata($parent, '>', $res);
}
-
+
exit(0);
}
return $self;
}
+sub timeout {
+ my ($self, $new_timeout) = @_;
+
+ $self->{timeout} = $new_timeout if $new_timeout;
+
+ return $self->{timeout};
+}
+
sub DESTROY {
my ($self) = @_;
if ($self->{cpid}) {
#print "$$: DESTROY WAIT0\n";
- eval { &$writedata($self->{child}, 'Q'); };
- my $res = waitpid($self->{cpid}, 0);
- #print "$$: DESTROY WAIT $res\n";
+ &$kill_worker($self);
+ #print "$$: DESTROY WAIT\n";
} else {
#print "$$: DESTROY SHUTDOWN\n";
pve_rados_shutdown($self->{conn}) if $self->{conn};
if ($self->{cpid}) {
my $data = encode_json(['cluster_stat', @args]);
- &$writedata($self->{child}, 'C', $data);
- return decode_json(&$readdata($self->{child}, 1));
+ my $raw = &$sendcmd($self, 'C', $data);
+ return decode_json($raw);
} else {
return pve_rados_cluster_stat($self->{conn});
}
my $json = encode_json($cmd);
- &$writedata($self->{child}, 'M', $json);
-
- my $raw = &$readdata($self->{child}, 1);
+ my $raw = &$sendcmd($self, 'M', $json);
if ($cmd->{format} && $cmd->{format} eq 'json') {
return length($raw) ? decode_json($raw) : undef;