]> git.proxmox.com Git - librados2-perl.git/blobdiff - PVE/RADOS.pm
allow to specify the userid with rados_create
[librados2-perl.git] / PVE / RADOS.pm
index f607fea2130b88e97c7854facdc36408fe6dbb66..d53f655a9646b62fa31d51bc2af67644eea28b22 100644 (file)
@@ -5,9 +5,18 @@ use strict;
 use warnings;
 use Carp;
 use JSON;
+use Socket;
+use PVE::Tools;
+use PVE::INotify;
+use PVE::RPCEnvironment;
 
 require Exporter;
 
+my $rados_default_timeout = 5;
+my $ceph_default_conf = '/etc/ceph/ceph.conf';
+my $ceph_default_user = 'admin';
+
+
 our @ISA = qw(Exporter);
 
 # Items to export into callers namespace by default. Note: do not export
@@ -32,47 +41,232 @@ our $VERSION = '1.0';
 require XSLoader;
 XSLoader::load('PVE::RADOS', $VERSION);
 
+my $writedata = sub {
+    my ($fh, $cmd, $data) = @_;
+
+    local $SIG{PIPE} = 'IGNORE';
+
+    my $bin = pack "a L/a*", $cmd, $data || '';
+    my $res = syswrite $fh, $bin;
+
+    die "write data failed - $!\n" if !defined($res);
+};
+
+my $readdata = sub {
+    my ($fh, $allow_eof) = @_;
+
+    my $head = '';
+
+    local $SIG{PIPE} = 'IGNORE';
+
+    while (length($head) < 5) {
+       last if !sysread $fh, $head, 5 - length($head), length($head);
+    }
+    return undef if $allow_eof && length($head) == 0;
+
+    die "partial read\n" if length($head) < 5;
+
+    my ($cmd, $len) = unpack "a L", $head;
+
+    my $data = '';
+    while (length($data) < $len) {
+       last if !sysread $fh, $data, $len - length($data), length($data);
+    }
+    die "partial data read\n" if length($data) < $len;
+
+    return wantarray ? ($cmd, $data) : $data;
+};
+
+my $kill_worker = sub {
+    my ($self) = @_;
+
+    return if !$self->{cpid};
+    return if  $self->{__already_killed};
+
+    $self->{__already_killed} = 1;
+
+    close($self->{child}) if defined($self->{child});
+
+    # only kill if we created the process
+    return if $self->{pid} != $$;
+
+    kill(9, $self->{cpid});
+    waitpid($self->{cpid}, 0);
+};
+
+my $sendcmd = sub {
+    my ($self, $cmd, $data, $expect_tag) = @_;
+
+    $expect_tag = '>' if !$expect_tag;
+
+    die "detected forked connection" if $self->{pid} != $$;
+
+    my ($restag, $raw);
+    my $code = sub {
+       &$writedata($self->{child}, $cmd, $data) if $expect_tag ne 'S';
+       ($restag, $raw) = &$readdata($self->{child});
+    };
+    eval { PVE::Tools::run_with_timeout($self->{timeout}, $code); };
+    if (my $err = $@) {
+       &$kill_worker($self);
+       die $err;
+    }
+    if ($restag eq 'E') {
+       die $raw if $raw;
+       die "unknown error\n";
+    }
+
+    die "got unexpected result\n" if $restag ne $expect_tag;
+
+    return $raw;
+};
+
 sub new {
     my ($class, %params) = @_;
 
-    my $conn = pve_rados_create() ||
-       die "unable to create RADOS object\n";
+    my $rpcenv = PVE::RPCEnvironment::get();
 
-    my $timeout = delete $params{timeout} || 5;
+    socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+       ||  die "socketpair: $!";
 
-    pve_rados_conf_set($conn, 'client_mount_timeout', $timeout);
+    my $cpid = fork();
 
-    foreach my $k (keys %params) {
-       pve_rados_conf_set($conn, $k, $params{$k});
-    }
+    die "unable to fork - $!\n" if !defined($cpid);
 
-    pve_rados_connect($conn);
+    my $self = bless {};
 
-    my $self = bless { conn =>  $conn };
+    my $timeout = delete $params{timeout} || $rados_default_timeout;
+
+    $self->{timeout} = $timeout;
+    $self->{pid} = $$;
+
+    if ($cpid) { # parent
+       close $parent;
+
+       $self->{cpid} = $cpid;
+       $self->{child} = $child;
+
+       &$sendcmd($self, undef, undef, 'S'); # wait for sync
+
+    } else { # child
+       $0 = 'pverados';
+
+       PVE::INotify::inotify_close();
+
+       if (my $atfork = $rpcenv->{atfork}) {
+           &$atfork();
+       }
+
+       # fixme: timeout?
+
+       close $child;
+
+       my $conn;
+       eval {
+           my $ceph_user = delete $params{userid} || $ceph_default_user;
+           $conn = pve_rados_create($ceph_user) ||
+               die "unable to create RADOS object\n";
+
+           if (defined($params{ceph_conf}) && (!-e $params{ceph_conf})) {
+               die "Supplied ceph config doesn't exist, $params{ceph_conf}";
+           }
+
+           my $ceph_conf = delete $params{ceph_conf} || $ceph_default_conf;
+
+           if (-e $ceph_conf) {
+               pve_rados_conf_read_file($conn, $ceph_conf);
+           }
+
+           pve_rados_conf_set($conn, 'client_mount_timeout', $timeout);
+
+           foreach my $k (keys %params) {
+               pve_rados_conf_set($conn, $k, $params{$k});
+           }
+
+           pve_rados_connect($conn);
+       };
+       if (my $err = $@) {
+           &$writedata($parent, 'E', $err);
+           die $err;
+       }
+       &$writedata($parent, 'S');
+
+       $self->{conn} = $conn;
+
+       for (;;) {
+           my ($cmd, $data) = &$readdata($parent, 1);
+
+           last if !$cmd || $cmd eq 'Q';
+
+           my $res;
+           eval {
+               if ($cmd eq 'M') { # rados monitor commands
+                   $res = pve_rados_mon_command($self->{conn}, [ $data ]);
+               } elsif ($cmd eq 'C') { # class methods
+                   my $aref = decode_json($data);
+                   my $method = shift @$aref;
+                   $res = encode_json($self->$method(@$aref));
+               } else {
+                   die "invalid command\n";
+               }
+           };
+           if (my $err = $@) {
+               &$writedata($parent, 'E', $err);
+               die $err;
+           }
+           &$writedata($parent, '>', $res);
+       }
+
+       exit(0);
+    }
 
     return $self;
 }
 
+sub timeout {
+    my ($self, $new_timeout) = @_;
+
+    $self->{timeout} = $new_timeout if $new_timeout;
+
+    return $self->{timeout};
+}
+
 sub DESTROY {
     my ($self) = @_;
 
-    pve_rados_shutdown($self->{conn});
+    if ($self->{cpid}) {
+       #print "$$: DESTROY WAIT0\n";
+       &$kill_worker($self);
+       #print "$$: DESTROY WAIT\n";
+    } else {
+       #print "$$: DESTROY SHUTDOWN\n";
+       pve_rados_shutdown($self->{conn}) if $self->{conn};
+    }
 }
 
 sub cluster_stat {
-    my ($self) = @_;
-
-    return  pve_rados_cluster_stat($self->{conn});
+    my ($self, @args) = @_;
+
+    if ($self->{cpid}) {
+       my $data = encode_json(['cluster_stat', @args]);
+       my $raw = &$sendcmd($self, 'C', $data);
+       return decode_json($raw);
+    } else {
+       return pve_rados_cluster_stat($self->{conn});
+    }
 }
 
-# example: { prefix => 'mon dump', format => 'json' }
+# example1: { prefix => 'get_command_descriptions'})
+# example2: { prefix => 'mon dump', format => 'json' }
 sub mon_command {
     my ($self, $cmd) = @_;
 
     $cmd->{format} = 'json' if !$cmd->{format};
 
     my $json = encode_json($cmd);
-    my $raw = pve_rados_mon_command($self->{conn}, [ $json ]);
+
+    my $raw = &$sendcmd($self, 'M', $json);
+
     if ($cmd->{format} && $cmd->{format} eq 'json') {
        return length($raw) ? decode_json($raw) : undef;
     }