}
my $tlist = $rest_env->active_workers($upid);
- $rest_env->broadcast_tasklist($tlist);
+ eval { $rest_env->broadcast_tasklist($tlist); };
+ syslog('err', $@) if $@;
my $task;
foreach my $t (@$tlist) {
# we try to reduce the amount of data
# list all running tasks and task and a few others
# try to limit to 25 tasks
- my $ctime = time();
my $max = 25 - scalar(@$tlist);
foreach my $task (@ta) {
last if $max <= 0;
return 1;
}
+# acts almost as tee: writes an output both to STDOUT and a task log,
+# we differ as we're worker aware and look also at the childs control pipe,
+# so we know if the function could be executed successfully or not.
+my $tee_worker = sub {
+ my ($childfd, $ctrlfd, $taskfh, $cpid) = @_;
+
+ eval {
+ my $int_count = 0;
+ local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
+ # always send signal to all pgrp members
+ my $kpid = -$cpid;
+ if ($int_count < 3) {
+ kill(15, $kpid); # send TERM signal
+ } else {
+ kill(9, $kpid); # send KILL signal
+ }
+ $int_count++;
+ };
+ local $SIG{PIPE} = sub { die "broken pipe\n"; };
+
+ my $select = new IO::Select;
+ my $fh = IO::Handle->new_from_fd($childfd, 'r');
+ $select->add($fh);
+
+ my $readbuf = '';
+ my $count;
+ while ($select->count) {
+ my @handles = $select->can_read(1);
+ if (scalar(@handles)) {
+ my $count = sysread ($handles[0], $readbuf, 4096);
+ if (!defined ($count)) {
+ my $err = $!;
+ die "sync pipe read error: $err\n";
+ }
+ last if $count == 0; # eof
+
+ print $readbuf;
+ select->flush();
+
+ print $taskfh $readbuf;
+ $taskfh->flush();
+ } else {
+ # some commands daemonize without closing stdout
+ last if !PVE::ProcFSTools::check_process_running($cpid);
+ }
+ }
+
+ # get status (error or OK)
+ POSIX::read($ctrlfd, $readbuf, 4096);
+ if ($readbuf =~ m/^TASK OK\n?$/) {
+ # skip printing to stdout
+ print $taskfh $readbuf;
+ } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) {
+ print STDERR "$1\n";
+ print $taskfh "\n$readbuf"; # ensure start on new line for webUI
+ } else {
+ die "got unexpected control message: $readbuf\n";
+ }
+ $taskfh->flush();
+ };
+ my $err = $@;
+
+ POSIX::close($childfd);
+ POSIX::close($ctrlfd);
+
+ if ($err) {
+ $err =~ s/\n/ /mg;
+ print STDERR "$err\n";
+ print $taskfh "TASK ERROR: $err\n";
+ }
+};
+
# start long running workers
# STDIN is redirected to /dev/null
# STDOUT,STDERR are redirected to the filename returned by upid_decode
my @psync = POSIX::pipe();
my @csync = POSIX::pipe();
+ my @ctrlfd = POSIX::pipe() if $sync;
my $node = $self->{nodename};
POSIX::setsid();
POSIX::close ($psync[0]);
+ POSIX::close ($ctrlfd[0]) if $sync;
POSIX::close ($csync[1]);
$outfh = $sync ? $psync[1] : undef;
+ my $resfh = $sync ? $ctrlfd[1] : undef;
eval {
PVE::INotify::inotify_close();
if !open(STDIN, "</dev/null");
$outfh = PVE::Tools::upid_open($upid);
+ $resfh = fileno($outfh);
}
}
# sync with parent (signal that we are ready)
- if ($sync) {
- print "$upid\n";
- } else {
- POSIX::write($psync[1], $upid, length ($upid));
- POSIX::close($psync[1]);
- }
+ POSIX::write($psync[1], $upid, length ($upid));
+ POSIX::close($psync[1]) if !$sync; # don't need output pipe if async
- my $readbuf = '';
- # sync with parent (wait until parent is ready)
- POSIX::read($csync[0], $readbuf, 4096);
- die "parent setup error\n" if $readbuf ne 'OK';
+ eval {
+ my $readbuf = '';
+ # sync with parent (wait until parent is ready)
+ POSIX::read($csync[0], $readbuf, 4096);
+ die "parent setup error\n" if $readbuf ne 'OK';
- if ($self->{type} eq 'ha') {
- print "task started by HA resource agent\n";
- }
- eval { &$function($upid); };
+ if ($self->{type} eq 'ha') {
+ print "task started by HA resource agent\n";
+ }
+ &$function($upid);
+ };
my $err = $@;
if ($err) {
chomp $err;
$err =~ s/\n/ /mg;
syslog('err', $err);
- print STDERR "TASK ERROR: $err\n";
+ my $msg = "TASK ERROR: $err\n";
+ POSIX::write($resfh, $msg, length($msg));
+ POSIX::close($resfh) if $sync;
POSIX::_exit(-1);
} else {
- print STDERR "TASK OK\n";
+ my $msg = "TASK OK\n";
+ POSIX::write($resfh, $msg, length($msg));
+ POSIX::close($resfh) if $sync;
POSIX::_exit(0);
}
kill(-9, $$);
# parent
POSIX::close ($psync[1]);
+ POSIX::close ($ctrlfd[1]) if $sync;
POSIX::close ($csync[0]);
my $readbuf = '';
$self->log_cluster_msg('info', $user, "starting task $upid");
my $tlist = $self->active_workers($upid, $sync);
- $self->broadcast_tasklist($tlist);
+ eval { $self->broadcast_tasklist($tlist); };
+ syslog('err', $@) if $@;
my $res = 0;
if ($sync) {
- my $count;
- my $outbuf = '';
- my $int_count = 0;
- eval {
- local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
- # always send signal to all pgrp members
- my $kpid = -$cpid;
- if ($int_count < 3) {
- kill(15, $kpid); # send TERM signal
- } else {
- kill(9, $kpid); # send KILL signal
- }
- $int_count++;
- };
- local $SIG{PIPE} = sub { die "broken pipe\n"; };
-
- my $select = new IO::Select;
- my $fh = IO::Handle->new_from_fd($psync[0], 'r');
- $select->add($fh);
-
- while ($select->count) {
- my @handles = $select->can_read(1);
- if (scalar(@handles)) {
- my $count = sysread ($handles[0], $readbuf, 4096);
- if (!defined ($count)) {
- my $err = $!;
- die "sync pipe read error: $err\n";
- }
- last if $count == 0; # eof
-
- $outbuf .= $readbuf;
- while ($outbuf =~ s/^(([^\010\r\n]*)(\r|\n|(\010)+|\r\n))//s) {
- my $line = $1;
- my $data = $2;
- if ($data =~ m/^TASK OK$/) {
- # skip
- } elsif ($data =~ m/^TASK ERROR: (.+)$/) {
- print STDERR "$1\n";
- } else {
- print $line;
- }
- if ($outfh) {
- print $outfh $line;
- $outfh->flush();
- }
- }
- } else {
- # some commands daemonize without closing stdout
- last if !PVE::ProcFSTools::check_process_running($cpid);
- }
- }
- };
- my $err = $@;
- POSIX::close($psync[0]);
-
- if ($outbuf) { # just to be sure
- print $outbuf;
- if ($outfh) {
- print $outfh $outbuf;
- }
- }
-
- if ($err) {
- $err =~ s/\n/ /mg;
- print STDERR "$err\n";
- if ($outfh) {
- print $outfh "TASK ERROR: $err\n";
- }
- }
+ $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid);
&$kill_process_group($cpid, $pstart); # make sure it gets killed