X-Git-Url: https://git.proxmox.com/?p=pve-common.git;a=blobdiff_plain;f=src%2FPVE%2FRESTEnvironment.pm;h=ebf8a2e7aaaadc8e88b50929b8f15e6e00e98dbd;hp=6c1ed59ae51c662f2894761e7a2cb1abf3d13428;hb=2311859b0da26e1806f5015783bfebb5aca16300;hpb=a313fe733560aeefcf083fd4adc59945e720d7ac diff --git a/src/PVE/RESTEnvironment.pm b/src/PVE/RESTEnvironment.pm index 6c1ed59..ebf8a2e 100644 --- a/src/PVE/RESTEnvironment.pm +++ b/src/PVE/RESTEnvironment.pm @@ -47,7 +47,8 @@ my $log_task_result = sub { } my $tlist = $rest_env->active_workers($upid); - $rest_env->broadcast_tasklist($tlist); + eval { $rest_env->broadcast_tasklist($tlist); }; + syslog('err', $@) if $@; my $task; foreach my $t (@$tlist) { @@ -60,8 +61,7 @@ my $log_task_result = sub { $msg = $task->{status}; } - $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg") - if $rest_env; + $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg"); }; my $worker_reaper = sub { @@ -210,11 +210,11 @@ sub set_user { } sub get_user { - my ($self) = @_; + my ($self, $noerr) = @_; - die "user name not set\n" if !$self->{user}; + return $self->{user} if defined($self->{user}) || $noerr; - return $self->{user}; + die "user name not set\n"; } sub is_worker { @@ -319,7 +319,6 @@ sub active_workers { # we try to reduce the amount of data # list all running tasks and task and a few others # try to limit to 25 tasks - my $ctime = time(); my $max = 25 - scalar(@$tlist); foreach my $task (@ta) { last if $max <= 0; @@ -374,6 +373,78 @@ sub check_worker { return 1; } +# acts almost as tee: writes an output both to STDOUT and a task log, +# we differ as we're worker aware and look also at the childs control pipe, +# so we know if the function could be executed successfully or not. +my $tee_worker = sub { + my ($childfd, $ctrlfd, $taskfh, $cpid) = @_; + + eval { + my $int_count = 0; + local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub { + # always send signal to all pgrp members + my $kpid = -$cpid; + if ($int_count < 3) { + kill(15, $kpid); # send TERM signal + } else { + kill(9, $kpid); # send KILL signal + } + $int_count++; + }; + local $SIG{PIPE} = sub { die "broken pipe\n"; }; + + my $select = new IO::Select; + my $fh = IO::Handle->new_from_fd($childfd, 'r'); + $select->add($fh); + + my $readbuf = ''; + my $count; + while ($select->count) { + my @handles = $select->can_read(1); + if (scalar(@handles)) { + my $count = sysread ($handles[0], $readbuf, 4096); + if (!defined ($count)) { + my $err = $!; + die "sync pipe read error: $err\n"; + } + last if $count == 0; # eof + + print $readbuf; + select->flush(); + + print $taskfh $readbuf; + $taskfh->flush(); + } else { + # some commands daemonize without closing stdout + last if !PVE::ProcFSTools::check_process_running($cpid); + } + } + + # get status (error or OK) + POSIX::read($ctrlfd, $readbuf, 4096); + if ($readbuf =~ m/^TASK OK\n?$/) { + # skip printing to stdout + print $taskfh $readbuf; + } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) { + print STDERR "$1\n"; + print $taskfh "\n$readbuf"; # ensure start on new line for webUI + } else { + die "got unexpected control message: $readbuf\n"; + } + $taskfh->flush(); + }; + my $err = $@; + + POSIX::close($childfd); + POSIX::close($ctrlfd); + + if ($err) { + $err =~ s/\n/ /mg; + print STDERR "$err\n"; + print $taskfh "TASK ERROR: $err\n"; + } +}; + # start long running workers # STDIN is redirected to /dev/null # STDOUT,STDERR are redirected to the filename returned by upid_decode @@ -397,6 +468,7 @@ sub fork_worker { my @psync = POSIX::pipe(); my @csync = POSIX::pipe(); + my @ctrlfd = POSIX::pipe() if $sync; my $node = $self->{nodename}; @@ -428,9 +500,11 @@ sub fork_worker { POSIX::setsid(); POSIX::close ($psync[0]); + POSIX::close ($ctrlfd[0]) if $sync; POSIX::close ($csync[1]); $outfh = $sync ? $psync[1] : undef; + my $resfh = $sync ? $ctrlfd[1] : undef; eval { PVE::INotify::inotify_close(); @@ -451,6 +525,7 @@ sub fork_worker { if !open(STDIN, "{type} eq 'ha') { - print "task started by HA resource agent\n"; - } - eval { &$function($upid); }; + if ($self->{type} eq 'ha') { + print "task started by HA resource agent\n"; + } + &$function($upid); + }; my $err = $@; if ($err) { chomp $err; $err =~ s/\n/ /mg; syslog('err', $err); - print STDERR "TASK ERROR: $err\n"; + my $msg = "TASK ERROR: $err\n"; + POSIX::write($resfh, $msg, length($msg)); + POSIX::close($resfh) if $sync; POSIX::_exit(-1); } else { - print STDERR "TASK OK\n"; + my $msg = "TASK OK\n"; + POSIX::write($resfh, $msg, length($msg)); + POSIX::close($resfh) if $sync; POSIX::_exit(0); } kill(-9, $$); @@ -516,6 +593,7 @@ sub fork_worker { # parent POSIX::close ($psync[1]); + POSIX::close ($ctrlfd[1]) if $sync; POSIX::close ($csync[0]); my $readbuf = ''; @@ -560,81 +638,14 @@ sub fork_worker { $self->log_cluster_msg('info', $user, "starting task $upid"); my $tlist = $self->active_workers($upid, $sync); - $self->broadcast_tasklist($tlist); + eval { $self->broadcast_tasklist($tlist); }; + syslog('err', $@) if $@; my $res = 0; if ($sync) { - my $count; - my $outbuf = ''; - my $int_count = 0; - eval { - local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub { - # always send signal to all pgrp members - my $kpid = -$cpid; - if ($int_count < 3) { - kill(15, $kpid); # send TERM signal - } else { - kill(9, $kpid); # send KILL signal - } - $int_count++; - }; - local $SIG{PIPE} = sub { die "broken pipe\n"; }; - - my $select = new IO::Select; - my $fh = IO::Handle->new_from_fd($psync[0], 'r'); - $select->add($fh); - - while ($select->count) { - my @handles = $select->can_read(1); - if (scalar(@handles)) { - my $count = sysread ($handles[0], $readbuf, 4096); - if (!defined ($count)) { - my $err = $!; - die "sync pipe read error: $err\n"; - } - last if $count == 0; # eof - - $outbuf .= $readbuf; - while ($outbuf =~ s/^(([^\010\r\n]*)(\r|\n|(\010)+|\r\n))//s) { - my $line = $1; - my $data = $2; - if ($data =~ m/^TASK OK$/) { - # skip - } elsif ($data =~ m/^TASK ERROR: (.+)$/) { - print STDERR "$1\n"; - } else { - print $line; - } - if ($outfh) { - print $outfh $line; - $outfh->flush(); - } - } - } else { - # some commands daemonize without closing stdout - last if !PVE::ProcFSTools::check_process_running($cpid); - } - } - }; - my $err = $@; - POSIX::close($psync[0]); - - if ($outbuf) { # just to be sure - print $outbuf; - if ($outfh) { - print $outfh $outbuf; - } - } - - if ($err) { - $err =~ s/\n/ /mg; - print STDERR "$err\n"; - if ($outfh) { - print $outfh "TASK ERROR: $err\n"; - } - } + $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid); &$kill_process_group($cpid, $pstart); # make sure it gets killed @@ -687,7 +698,9 @@ sub check_api2_permissions { sub init_request { my ($self, %params) = @_; - # implement in subclass + $self->{result_attributes} = {} + + # if you nedd more, implement in subclass } 1;