use IO::Handle;
use IO::Select;
use POSIX qw(:sys_wait_h EINTR);
+use AnyEvent;
use PVE::Exception qw(raise raise_perm_exc);
use PVE::INotify;
die "unknown environment type"
if !$type || $type !~ m/^(cli|pub|priv|ha)$/;
- $SIG{CHLD} = $worker_reaper;
+ $SIG{CHLD} = sub {
+ # when we're using AnyEvent, we have to postpone the call to worker_reaper, otherwise it
+ # might interfere with running api calls
+ if (defined($AnyEvent::MODEL)) {
+ AnyEvent::postpone { $worker_reaper->() };
+ } else {
+ $worker_reaper->();
+ }
+ };
# environment types
# cli ... command started fron command line
return $WORKER_FLAG;
}
-# read/update list of active workers
-# we move all finished tasks to the archive index,
-# but keep aktive and most recent task in the active file.
-# $nocheck ... consider $new_upid still running (avoid that
-# we try to read the reult to early.
-sub active_workers {
+# read/update list of active workers.
+#
+# we move all finished tasks to the archive index, but keep active, and most recent tasks in the
+# active file.
+# $nocheck ... consider $new_upid still running (avoid that we try to read the result to early).
+sub active_workers {
my ($self, $new_upid, $nocheck) = @_;
- my $lkfn = "/var/log/pve/tasks/.active.lock";
-
my $timeout = 10;
- my $code = sub {
-
+ my $res = PVE::Tools::lock_file("/var/log/pve/tasks/.active.lock", $timeout, sub {
my $tasklist = PVE::INotify::read_file('active');
my @ta;
&$check_task($task);
}
- if ($new_upid && !(my $task = $thash->{$new_upid})) {
- $task = PVE::Tools::upid_decode($new_upid);
+ if ($new_upid && !$thash->{$new_upid}) {
+ my $task = PVE::Tools::upid_decode($new_upid);
$task->{upid} = $new_upid;
$thash->{$new_upid} = $task;
&$check_task($task, $nocheck);
}
}
- # we try to reduce the amount of data
- # list all running tasks and task and a few others
- # try to limit to 25 tasks
- my $max = 25 - scalar(@$tlist);
+ # we try to reduce the amount of data list all running tasks and task and a few others
+ my $MAX_FINISHED = 25;
+ my $max = $MAX_FINISHED - scalar(@$tlist);
foreach my $task (@ta) {
last if $max <= 0;
push @$tlist, $task;
PVE::INotify::write_file('active', $tlist) if $save;
return $tlist;
- };
-
- my $res = PVE::Tools::lock_file($lkfn, $timeout, $code);
+ });
die $@ if $@;
return $res;
};
local $SIG{PIPE} = sub { die "broken pipe\n"; };
- my $select = new IO::Select;
+ my $select = IO::Select->new();
my $fh = IO::Handle->new_from_fd($childfd, 'r');
$select->add($fh);
$dtype = 'unknown' if !defined ($dtype);
$id = '' if !defined ($id);
- $user = 'root@pve' if !defined ($user);
+ # note: below is only used for the task log entry
+ $user = $self->get_user(1) // 'root@pam' if !defined($user);
my $sync = ($self->{type} eq 'cli' && !$background) ? 1 : 0;
my @psync = POSIX::pipe();
my @csync = POSIX::pipe();
- my @ctrlfd = POSIX::pipe() if $sync;
+ my @ctrlfd = $sync ? POSIX::pipe() : ();
my $node = $self->{nodename};
close STDIN;
POSIX::close(0) if $fd != 0;
- die "unable to redirect STDIN - $!"
- if !open(STDIN, "</dev/null");
+ open(STDIN, '<', '/dev/null') or die "unable to redirect STDIN - $!";
$outfh = PVE::Tools::upid_open($upid);
$resfh = fileno($outfh);
close STDOUT;
POSIX::close (1) if $fd != 1;
- die "unable to redirect STDOUT - $!"
- if !open(STDOUT, ">&", $outfh);
+ open(STDOUT, ">&", $outfh) or die "unable to redirect STDOUT - $!";
STDOUT->autoflush (1);
close STDERR;
POSIX::close(2) if $fd != 2;
- die "unable to redirect STDERR - $!"
- if !open(STDERR, ">&1");
+ open(STDERR, '>&', '1') or die "unable to redirect STDERR - $!";
STDERR->autoflush(1);
};
return wantarray ? ($upid, $res) : $upid;
}
+sub log_warn {
+ my ($message) = @_;
+
+ if ($rest_env) {
+ $rest_env->warn($message);
+ } else {
+ chomp($message);
+ print STDERR "WARN: $message\n";
+ }
+}
+
sub warn {
my ($self, $message) = @_;