Daemon: implement pre-fork server (max_wrokers property)
authorDietmar Maurer <dietmar@proxmox.com>
Tue, 30 Dec 2014 10:19:53 +0000 (11:19 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 31 Dec 2014 08:47:41 +0000 (09:47 +0100)
data/PVE/Daemon.pm

index 279a78c..757c2d1 100644 (file)
@@ -8,7 +8,7 @@ package PVE::Daemon;
 # * correctly daemonize (redirect STDIN/STDOUT)
 # * restart by stop/start, exec, or signal HUP
 # * daemon restart on error (option 'restart_on_error')
-#
+# * handle worker processes (option 'max_workers')
 
 use strict;
 use warnings;
@@ -75,6 +75,121 @@ my $server_cleanup = sub {
     unlink $self->{pidfile};
 };
 
+my $finish_workers = sub {
+    my ($self) = @_;
+
+    foreach my $cpid (keys %{$self->{workers}}) {
+        my $waitpid = waitpid($cpid, WNOHANG);
+        if (defined($waitpid) && ($waitpid == $cpid)) {
+            delete ($self->{workers}->{$cpid});
+           syslog('info', "worker $cpid finished");
+       }
+    }
+};
+
+my $start_workers = sub {
+    my ($self) = @_;
+
+    return if $self->{terminate};
+
+    my $count = 0;
+    foreach my $cpid (keys %{$self->{workers}}) {
+       $count++;
+    }
+
+    my $need = $self->{max_workers} - $count;
+
+    return if $need <= 0;
+
+    syslog('info', "starting $need worker(s)");
+
+    while ($need > 0) {
+       my $pid = fork;
+
+       if (!defined ($pid)) {
+           syslog('err', "can't fork worker");
+           sleep (1);
+       } elsif ($pid) { # parent
+           $self->{workers}->{$pid} = 1;
+           syslog('info', "worker $pid started");
+           $need--;
+       } else {
+           $0 = "$self->{name} worker";
+
+           close(FLCK);
+
+           PVE::INotify::inotify_close();
+
+           for my $sig (qw(CHLD HUP INT TERM QUIT)) {
+               $SIG{$sig} = 'DEFAULT'; # restore default handler
+               # AnyEvent signals only works if $SIG{XX} is 
+               # undefined (perl event loop)
+               delete $SIG{$sig}; # so that we can handle events with AnyEvent
+           }
+
+           eval { $self->run(); };
+           if (my $err = $@) {
+               syslog('err', $err);
+               sleep(5); # avoid fast restarts
+           }
+
+           syslog('info', "worker exit");
+           exit (0);
+       }
+    }
+};
+
+my $terminate_server = sub {
+    my ($self) = @_;
+
+    $self->{terminate} = 1; # set flag to avoid worker restart
+
+    if (!$self->{max_workers}) {
+       eval { $self->shutdown(); };
+       warn $@ if $@;
+       return;
+    }
+
+    eval { $self->shutdown(); };
+    warn $@ if $@;
+
+    # we have workers - terminate them
+
+    foreach my $cpid (keys %{$self->{workers}}) {
+       kill(15, $cpid); # TERM childs
+    }
+
+    # nicely shutdown childs (give them max 10 seconds to shut down)
+    my $previous_alarm = alarm(10);
+    eval {
+       local $SIG{ALRM} = sub { die "timeout\n" };
+
+       while ((my $pid = waitpid (-1, 0)) > 0) {
+           if (defined($self->{workers}->{$pid})) {
+               delete($self->{workers}->{$pid});
+               syslog('info', "worker $pid finished");
+           }
+       }
+       alarm(0); # avoid race condition
+    };
+    my $err = $@;
+
+    alarm ($previous_alarm);
+
+    if ($err) {
+       syslog('err', "error stopping workers (will kill them now) - $err");
+       foreach my $cpid (keys %{$self->{workers}}) {
+           # KILL childs still alive!
+           if (kill (0, $cpid)) {
+               delete($self->{workers}->{$cpid});
+               syslog("err", "kill worker $cpid");
+               kill(9, $cpid);
+               # fixme: waitpid?
+           }
+       }
+    }
+};
+
 my $server_run = sub {
     my ($self, $debug) = @_;
 
@@ -121,27 +236,73 @@ my $server_run = sub {
 
     open STDERR, '>&STDOUT' || die "can't close STDERR\n";
 
-    $SIG{INT} = $SIG{TERM} = $SIG{QUIT} = sub {
-       $SIG{INT} = 'DEFAULT';
+    my $old_sig_term = $SIG{TERM};
+    local $SIG{TERM} = sub {
+       local ($@, $!, $?); # do not overwrite error vars
+       syslog('info', "received signal TERM");
+       &$terminate_server($self);
+       &$server_cleanup($self);
+       &$old_sig_term(@_) if $old_sig_term;
+    };
 
-       eval { $self->shutdown(); };
-       warn $@ if $@;
+    my $old_sig_quit = $SIG{QUIT};
+    local $SIG{QUIT} = sub {
+       local ($@, $!, $?); # do not overwrite error vars
+       syslog('info', "received signal QUIT");
+       &$terminate_server($self);
+       &$server_cleanup($self);
+       &$old_sig_quit(@_) if $old_sig_quit;
+    };
 
+    my $old_sig_int = $SIG{INT};
+    local $SIG{INT} = sub {
+       local ($@, $!, $?); # do not overwrite error vars
+       syslog('info', "received signal INT");
+       $SIG{INT} = 'DEFAULT'; # allow to terminate now
+       &$terminate_server($self);
        &$server_cleanup($self);
+       &$old_sig_int(@_) if $old_sig_int;
     };
 
-    if ($self->can('hup')) {
-       $SIG{HUP} = sub {
+    $SIG{HUP} = sub {
+       local ($@, $!, $?); # do not overwrite error vars
+       syslog('info', "received signal HUP");
+       if ($self->{max_workers}) {
+           &$terminate_server($self);
+           $self->{got_hup_signal} = 1;
+       } elsif ($self->can('hup')) {
            eval { $self->hup() };
            warn $@ if $@;
-       };
-    }
+       }
+    };
+
+    eval { 
+       if ($self->{max_workers}) {
+           my $old_sig_chld = $SIG{CHLD};
+           local $SIG{CHLD} = sub {
+               local ($@, $!, $?); # do not overwrite error vars
+               &$finish_workers($self);
+               &$old_sig_chld(@_) if $old_sig_chld;
+           };
+
+           for (;;) { # forever
+               &$start_workers($self);
+               sleep(5);
+               &$finish_workers($self);
+               last if $self->{terminate};
+           }
 
-    eval { $self->run() };
+       } else {
+           $self->run();
+       } 
+    };
     my $err = $@;
 
     if ($err) {
        syslog ('err', "ERROR: $err");
+
+       # fixme: kill all workers
+
        if (my $wait_time = $self->{restart_on_error}) {
            $self->restart_daemon($wait_time);
        } else {
@@ -149,13 +310,17 @@ my $server_run = sub {
        }
     }
 
-    $self->exit_daemon(0);
+    if ($self->{got_hup_signal}) {
+       $self->restart_daemon();
+    } else {
+       $self->exit_daemon(0);
+    }
 };
 
 sub new {
     my ($this, $name, $cmdline, %params) = @_;
 
-    die "please run as root\n" if $> != 0;
+    die "please run as root\n" if !$ENV{RESTART_PVE_DAEMON} && ($> != 0);
 
     die "missing name" if !$name;
 
@@ -171,6 +336,7 @@ sub new {
     my $self = bless { 
        name => $name,
        run_dir => '/var/run',
+       workers => {},
     }, $class;
 
     foreach my $opt (keys %params) {
@@ -181,6 +347,8 @@ sub new {
            $self->{$opt} = $value;
        } elsif ($opt eq 'run_dir') {
            $self->{$opt} = $value;
+       } elsif ($opt eq 'max_workers') {
+           $self->{$opt} = $value;
        } else {
            die "unknown option '$opt'";
        }
@@ -190,10 +358,14 @@ sub new {
 
     $self->{nodename} = PVE::INotify::nodename();
 
-    $self->{cmdline} = $cmdline;
+    $self->{cmdline} = [];
 
-    $0 = $name;
+    foreach my $el (@$cmdline) {
+       $el =~ m/^(.*)$/; # untaint
+       push @{$self->{cmdline}}, $1;
+    }
 
+    $0 = $name;
 
     return $self;
 }
@@ -237,8 +409,10 @@ sub shutdown {
 
     syslog('info' , "server closing");
 
-    # wait for children
-    1 while (waitpid(-1, POSIX::WNOHANG()) > 0);
+    if (!$self->{max_workers}) {
+       # wait for children
+       1 while (waitpid(-1, POSIX::WNOHANG()) > 0);
+    }
 }
 
 # please define in subclass
@@ -364,7 +538,8 @@ my $reload_daemon = sub {
            $self->start();
        } else {
            if ($use_hup) {
-               kill(1, $pid);
+               syslog('info', "send HUP to $pid");
+               kill 1, $pid;
            } else {
                $self->stop();
                $self->start();