]> git.proxmox.com Git - pve-common.git/blobdiff - src/PVE/RESTEnvironment.pm
fix #1819: fork_worker: ensure sync'ed workers control terminal
[pve-common.git] / src / PVE / RESTEnvironment.pm
index c528e61704402a69e79c095fedf0c204de4409e1..3155aac849de59a2f9f7eb1fb028c97422a66b84 100644 (file)
@@ -47,7 +47,8 @@ my $log_task_result = sub {
     }
 
     my $tlist = $rest_env->active_workers($upid);
-    $rest_env->broadcast_tasklist($tlist);
+    eval { $rest_env->broadcast_tasklist($tlist); };
+    syslog('err', $@) if $@;
 
     my $task;
     foreach my $t (@$tlist) {
@@ -60,8 +61,7 @@ my $log_task_result = sub {
        $msg = $task->{status};
     }
 
-    $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg")
-       if $rest_env;
+    $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg");
 };
 
 my $worker_reaper = sub {
@@ -270,7 +270,7 @@ sub active_workers  {
        }
 
 
-       @ta = sort { $b->{starttime} cmp $a->{starttime} } @ta;
+       @ta = sort { $b->{starttime} <=> $a->{starttime} } @ta;
 
        my $save = defined($new_upid);
 
@@ -319,7 +319,6 @@ sub active_workers  {
        # we try to reduce the amount of data
        # list all running tasks and task and a few others
        # try to limit to 25 tasks
-       my $ctime = time();
        my $max = 25 - scalar(@$tlist);
         foreach my $task (@ta) {
            last if $max <= 0;
@@ -374,6 +373,78 @@ sub check_worker {
     return 1;
 }
 
+# acts almost as tee: writes an output both to STDOUT and a task log,
+# we differ as we're worker aware and look also at the childs control pipe,
+# so we know if the function could be executed successfully or not.
+my $tee_worker = sub {
+    my ($childfd, $ctrlfd, $taskfh, $cpid) = @_;
+
+    eval {
+       my $int_count = 0;
+       local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
+           # always send signal to all pgrp members
+           my $kpid = -$cpid;
+           if ($int_count < 3) {
+               kill(15, $kpid); # send TERM signal
+           } else {
+               kill(9, $kpid); # send KILL signal
+           }
+           $int_count++;
+       };
+       local $SIG{PIPE} = sub { die "broken pipe\n"; };
+
+       my $select = new IO::Select;
+       my $fh = IO::Handle->new_from_fd($childfd, 'r');
+       $select->add($fh);
+
+       my $readbuf = '';
+       my $count;
+       while ($select->count) {
+           my @handles = $select->can_read(1);
+           if (scalar(@handles)) {
+               my $count = sysread ($handles[0], $readbuf, 4096);
+               if (!defined ($count)) {
+                   my $err = $!;
+                   die "sync pipe read error: $err\n";
+               }
+               last if $count == 0; # eof
+
+               print $readbuf;
+               select->flush();
+
+               print $taskfh $readbuf;
+               $taskfh->flush();
+           } else {
+               # some commands daemonize without closing stdout
+               last if !PVE::ProcFSTools::check_process_running($cpid);
+           }
+       }
+
+       # get status (error or OK)
+       POSIX::read($ctrlfd, $readbuf, 4096);
+       if ($readbuf =~ m/^TASK OK\n?$/) {
+           # skip printing to stdout
+           print $taskfh $readbuf;
+       } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) {
+           print STDERR "$1\n";
+           print $taskfh "\n$readbuf"; # ensure start on new line for webUI
+       } else {
+           die "got unexpected control message: $readbuf\n";
+       }
+       $taskfh->flush();
+    };
+    my $err = $@;
+
+    POSIX::close($childfd);
+    POSIX::close($ctrlfd);
+
+    if ($err) {
+       $err =~ s/\n/ /mg;
+       print STDERR "$err\n";
+       print $taskfh "TASK ERROR: $err\n";
+    }
+};
+
 # start long running workers
 # STDIN is redirected to /dev/null
 # STDOUT,STDERR are redirected to the filename returned by upid_decode
@@ -397,6 +468,7 @@ sub fork_worker {
 
     my @psync = POSIX::pipe();
     my @csync = POSIX::pipe();
+    my @ctrlfd = POSIX::pipe() if $sync;
 
     my $node = $self->{nodename};
 
@@ -422,15 +494,23 @@ sub fork_worker {
        $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { die "received interrupt\n"; };
 
        $SIG{CHLD} = $SIG{PIPE} = 'DEFAULT';
+       $SIG{TTOU} = 'IGNORE';
 
        # set sess/process group - we want to be able to kill the
        # whole process group
-       POSIX::setsid();
+       if ($sync && -t STDIN) {
+           POSIX::setpgid(0,0) or die "failed to setpgid: $!\n";;
+           POSIX::tcsetpgrp(fileno(STDIN), $$) or die "failed to tcsetpgrp: $!\n";
+       } else {
+           POSIX::setsid();
+       }
 
        POSIX::close ($psync[0]);
+       POSIX::close ($ctrlfd[0]) if $sync;
        POSIX::close ($csync[1]);
 
        $outfh = $sync ? $psync[1] : undef;
+       my $resfh = $sync ? $ctrlfd[1] : undef;
 
        eval {
            PVE::INotify::inotify_close();
@@ -451,6 +531,7 @@ sub fork_worker {
                    if !open(STDIN, "</dev/null");
 
                $outfh = PVE::Tools::upid_open($upid);
+               $resfh = fileno($outfh);
            }
 
 
@@ -483,31 +564,33 @@ sub fork_worker {
        }
 
        # sync with parent (signal that we are ready)
-       if ($sync) {
-           print "$upid\n";
-       } else {
-           POSIX::write($psync[1], $upid, length ($upid));
-           POSIX::close($psync[1]);
-       }
+       POSIX::write($psync[1], $upid, length ($upid));
+       POSIX::close($psync[1]) if !$sync; # don't need output pipe if async
 
-       my $readbuf = '';
-       # sync with parent (wait until parent is ready)
-       POSIX::read($csync[0], $readbuf, 4096);
-       die "parent setup error\n" if $readbuf ne 'OK';
+       eval {
+           my $readbuf = '';
+           # sync with parent (wait until parent is ready)
+           POSIX::read($csync[0], $readbuf, 4096);
+           die "parent setup error\n" if $readbuf ne 'OK';
 
-       if ($self->{type} eq 'ha') {
-           print "task started by HA resource agent\n";
-       }
-       eval { &$function($upid); };
+           if ($self->{type} eq 'ha') {
+               print "task started by HA resource agent\n";
+           }
+           &$function($upid);
+       };
        my $err = $@;
        if ($err) {
            chomp $err;
            $err =~ s/\n/ /mg;
            syslog('err', $err);
-           print STDERR "TASK ERROR: $err\n";
+           my $msg = "TASK ERROR: $err\n";
+           POSIX::write($resfh, $msg, length($msg));
+           POSIX::close($resfh) if $sync;
            POSIX::_exit(-1);
        } else {
-           print STDERR "TASK OK\n";
+           my $msg = "TASK OK\n";
+           POSIX::write($resfh, $msg, length($msg));
+           POSIX::close($resfh) if $sync;
            POSIX::_exit(0);
        }
        kill(-9, $$);
@@ -516,6 +599,7 @@ sub fork_worker {
     # parent
 
     POSIX::close ($psync[1]);
+    POSIX::close ($ctrlfd[1]) if $sync;
     POSIX::close ($csync[0]);
 
     my $readbuf = '';
@@ -560,81 +644,14 @@ sub fork_worker {
     $self->log_cluster_msg('info', $user, "starting task $upid");
 
     my $tlist = $self->active_workers($upid, $sync);
-    $self->broadcast_tasklist($tlist);
+    eval { $self->broadcast_tasklist($tlist); };
+    syslog('err', $@) if $@;
 
     my $res = 0;
 
     if ($sync) {
-       my $count;
-       my $outbuf = '';
-       my $int_count = 0;
-       eval {
-           local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
-               # always send signal to all pgrp members
-               my $kpid = -$cpid;
-               if ($int_count < 3) {
-                   kill(15, $kpid); # send TERM signal
-               } else {
-                   kill(9, $kpid); # send KILL signal
-               }
-               $int_count++;
-           };
-           local $SIG{PIPE} = sub { die "broken pipe\n"; };
-
-           my $select = new IO::Select;
-           my $fh = IO::Handle->new_from_fd($psync[0], 'r');
-           $select->add($fh);
-
-           while ($select->count) {
-               my @handles = $select->can_read(1);
-               if (scalar(@handles)) {
-                   my $count = sysread ($handles[0], $readbuf, 4096);
-                   if (!defined ($count)) {
-                       my $err = $!;
-                       die "sync pipe read error: $err\n";
-                   }
-                   last if $count == 0; # eof
-
-                   $outbuf .= $readbuf;
-                   while ($outbuf =~ s/^(([^\010\r\n]*)(\r|\n|(\010)+|\r\n))//s) {
-                       my $line = $1;
-                       my $data = $2;
-                       if ($data =~ m/^TASK OK$/) {
-                           # skip
-                       } elsif ($data =~ m/^TASK ERROR: (.+)$/) {
-                           print STDERR "$1\n";
-                       } else {
-                           print $line;
-                       }
-                       if ($outfh) {
-                           print $outfh $line;
-                           $outfh->flush();
-                       }
-                   }
-               } else {
-                   # some commands daemonize without closing stdout
-                   last if !PVE::ProcFSTools::check_process_running($cpid);
-               }
-           }
-       };
-       my $err = $@;
 
-       POSIX::close($psync[0]);
-
-       if ($outbuf) { # just to be sure
-           print $outbuf;
-           if ($outfh) {
-               print $outfh $outbuf;
-           }
-       }
-
-       if ($err) {
-           $err =~ s/\n/ /mg;
-           print STDERR "$err\n";
-           if ($outfh) {
-               print $outfh "TASK ERROR: $err\n";
-           }
-       }
+       $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid);
 
        &$kill_process_group($cpid, $pstart); # make sure it gets killed