X-Git-Url: https://git.proxmox.com/?p=pve-common.git;a=blobdiff_plain;f=src%2FPVE%2FRESTEnvironment.pm;h=b32c452947fdbdbecd6c4a5582718fdf021fa3a2;hp=6cf422f851976e230f2e4d2e28ba9bf9e860659a;hb=61aca93afb586c57d0135036b763b3aa67968084;hpb=d9072797ea62384d247c9fb01de90f9dd31ea4c3 diff --git a/src/PVE/RESTEnvironment.pm b/src/PVE/RESTEnvironment.pm index 6cf422f..b32c452 100644 --- a/src/PVE/RESTEnvironment.pm +++ b/src/PVE/RESTEnvironment.pm @@ -26,7 +26,7 @@ my $rest_env; # and register forked processes with &$register_worker(pid) # Note: using $SIG{CHLD} = 'IGNORE' or $SIG{CHLD} = sub { wait (); } or ... # has serious side effects, because perls built in system() and open() -# functions can't get the correct exit status of a child. So we cant use +# functions can't get the correct exit status of a child. So we can't use # that (also see perlipc) my $WORKER_PIDS; @@ -47,7 +47,8 @@ my $log_task_result = sub { } my $tlist = $rest_env->active_workers($upid); - $rest_env->broadcast_tasklist($tlist); + eval { $rest_env->broadcast_tasklist($tlist); }; + syslog('err', $@) if $@; my $task; foreach my $t (@$tlist) { @@ -60,8 +61,7 @@ my $log_task_result = sub { $msg = $task->{status}; } - $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg") - if $rest_env; + $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg"); }; my $worker_reaper = sub { @@ -210,11 +210,25 @@ sub set_user { } sub get_user { - my ($self) = @_; + my ($self, $noerr) = @_; - die "user name not set\n" if !$self->{user}; + return $self->{user} if defined($self->{user}) || $noerr; - return $self->{user}; + die "user name not set\n"; +} + +sub set_u2f_challenge { + my ($self, $challenge) = @_; + + $self->{u2f_challenge} = $challenge; +} + +sub get_u2f_challenge { + my ($self, $noerr) = @_; + + return $self->{u2f_challenge} if defined($self->{u2f_challenge}) || $noerr; + + die "no active u2f challenge\n"; } sub is_worker { @@ -270,7 +284,7 @@ sub active_workers { } - @ta = sort { $b->{starttime} cmp $a->{starttime} } @ta; + @ta = sort { $b->{starttime} <=> $a->{starttime} } @ta; my $save = defined($new_upid); @@ -319,7 +333,6 @@ sub active_workers { # we try to reduce the amount of data # list all running tasks and task and a few others # try to limit to 25 tasks - my $ctime = time(); my $max = 25 - scalar(@$tlist); foreach my $task (@ta) { last if $max <= 0; @@ -358,7 +371,7 @@ my $kill_process_group = sub { }; sub check_worker { - my ($upid, $killit) = @_; + my ($self, $upid, $killit) = @_; my $task = PVE::Tools::upid_decode($upid); @@ -374,6 +387,78 @@ sub check_worker { return 1; } +# acts almost as tee: writes an output both to STDOUT and a task log, +# we differ as we're worker aware and look also at the childs control pipe, +# so we know if the function could be executed successfully or not. +my $tee_worker = sub { + my ($childfd, $ctrlfd, $taskfh, $cpid) = @_; + + eval { + my $int_count = 0; + local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub { + # always send signal to all pgrp members + my $kpid = -$cpid; + if ($int_count < 3) { + kill(15, $kpid); # send TERM signal + } else { + kill(9, $kpid); # send KILL signal + } + $int_count++; + }; + local $SIG{PIPE} = sub { die "broken pipe\n"; }; + + my $select = new IO::Select; + my $fh = IO::Handle->new_from_fd($childfd, 'r'); + $select->add($fh); + + my $readbuf = ''; + my $count; + while ($select->count) { + my @handles = $select->can_read(1); + if (scalar(@handles)) { + my $count = sysread ($handles[0], $readbuf, 4096); + if (!defined ($count)) { + my $err = $!; + die "sync pipe read error: $err\n"; + } + last if $count == 0; # eof + + print $readbuf; + select->flush(); + + print $taskfh $readbuf; + $taskfh->flush(); + } else { + # some commands daemonize without closing stdout + last if !PVE::ProcFSTools::check_process_running($cpid); + } + } + + # get status (error or OK) + POSIX::read($ctrlfd, $readbuf, 4096); + if ($readbuf =~ m/^TASK OK\n?$/) { + # skip printing to stdout + print $taskfh $readbuf; + } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) { + print STDERR "$1\n"; + print $taskfh "\n$readbuf"; # ensure start on new line for webUI + } else { + die "got unexpected control message: $readbuf\n"; + } + $taskfh->flush(); + }; + my $err = $@; + + POSIX::close($childfd); + POSIX::close($ctrlfd); + + if ($err) { + $err =~ s/\n/ /mg; + print STDERR "$err\n"; + print $taskfh "TASK ERROR: $err\n"; + } +}; + # start long running workers # STDIN is redirected to /dev/null # STDOUT,STDERR are redirected to the filename returned by upid_decode @@ -397,6 +482,7 @@ sub fork_worker { my @psync = POSIX::pipe(); my @csync = POSIX::pipe(); + my @ctrlfd = POSIX::pipe() if $sync; my $node = $self->{nodename}; @@ -422,15 +508,26 @@ sub fork_worker { $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { die "received interrupt\n"; }; $SIG{CHLD} = $SIG{PIPE} = 'DEFAULT'; - - # set sess/process group - we want to be able to kill the - # whole process group - POSIX::setsid(); + $SIG{TTOU} = 'IGNORE'; + + my $ppgid; + # set session/process group allows to kill the process group + if ($sync && -t STDIN) { + # some sync'ed workers operate on the tty but setsid sessions lose + # the tty, so just create a new pgroup and give it the tty + $ppgid = POSIX::getpgrp() or die "failed to get old pgid: $!\n"; + POSIX::setpgid(0, 0) or die "failed to setpgid: $!\n"; + POSIX::tcsetpgrp(fileno(STDIN), $$) or die "failed to tcsetpgrp: $!\n"; + } else { + POSIX::setsid(); + } POSIX::close ($psync[0]); + POSIX::close ($ctrlfd[0]) if $sync; POSIX::close ($csync[1]); $outfh = $sync ? $psync[1] : undef; + my $resfh = $sync ? $ctrlfd[1] : undef; eval { PVE::INotify::inotify_close(); @@ -439,7 +536,7 @@ sub fork_worker { &$atfork(); } - # same algorythm as used inside SA + # same algorithm as used inside SA # STDIN = /dev/null my $fd = fileno (STDIN); @@ -451,6 +548,7 @@ sub fork_worker { if !open(STDIN, "{type} eq 'ha') { - print "task started by HA resource agent\n"; - } - eval { &$function($upid); }; + if ($self->{type} eq 'ha') { + print "task started by HA resource agent\n"; + } + &$function($upid); + }; + my ($msg, $exitcode); my $err = $@; if ($err) { chomp $err; $err =~ s/\n/ /mg; syslog('err', $err); - print STDERR "TASK ERROR: $err\n"; - POSIX::_exit(-1); + $msg = "TASK ERROR: $err\n"; + $exitcode = -1; } else { - print STDERR "TASK OK\n"; - POSIX::_exit(0); + $msg = "TASK OK\n"; + $exitcode = 0; + } + POSIX::write($resfh, $msg, length($msg)); + + if ($sync) { + POSIX::close($resfh); + if ( -t STDIN) { + POSIX::tcsetpgrp(fileno(STDIN), $ppgid) or + die "failed to tcsetpgrp to parent: $!\n"; + } } - kill(-9, $$); + POSIX::_exit($exitcode); + kill(-9, $$); # not really needed, just to be sure } # parent POSIX::close ($psync[1]); + POSIX::close ($ctrlfd[1]) if $sync; POSIX::close ($csync[0]); my $readbuf = ''; @@ -560,81 +668,14 @@ sub fork_worker { $self->log_cluster_msg('info', $user, "starting task $upid"); my $tlist = $self->active_workers($upid, $sync); - $self->broadcast_tasklist($tlist); + eval { $self->broadcast_tasklist($tlist); }; + syslog('err', $@) if $@; my $res = 0; if ($sync) { - my $count; - my $outbuf = ''; - my $int_count = 0; - eval { - local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub { - # always send signal to all pgrp members - my $kpid = -$cpid; - if ($int_count < 3) { - kill(15, $kpid); # send TERM signal - } else { - kill(9, $kpid); # send KILL signal - } - $int_count++; - }; - local $SIG{PIPE} = sub { die "broken pipe\n"; }; - - my $select = new IO::Select; - my $fh = IO::Handle->new_from_fd($psync[0], 'r'); - $select->add($fh); - - while ($select->count) { - my @handles = $select->can_read(1); - if (scalar(@handles)) { - my $count = sysread ($handles[0], $readbuf, 4096); - if (!defined ($count)) { - my $err = $!; - die "sync pipe read error: $err\n"; - } - last if $count == 0; # eof - - $outbuf .= $readbuf; - while ($outbuf =~ s/^(([^\010\r\n]*)(\r|\n|(\010)+|\r\n))//s) { - my $line = $1; - my $data = $2; - if ($data =~ m/^TASK OK$/) { - # skip - } elsif ($data =~ m/^TASK ERROR: (.+)$/) { - print STDERR "$1\n"; - } else { - print $line; - } - if ($outfh) { - print $outfh $line; - $outfh->flush(); - } - } - } else { - # some commands daemonize without closing stdout - last if !PVE::ProcFSTools::check_process_running($cpid); - } - } - }; - my $err = $@; - POSIX::close($psync[0]); - - if ($outbuf) { # just to be sure - print $outbuf; - if ($outfh) { - print $outfh $outbuf; - } - } - - if ($err) { - $err =~ s/\n/ /mg; - print STDERR "$err\n"; - if ($outfh) { - print $outfh "TASK ERROR: $err\n"; - } - } + $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid); &$kill_process_group($cpid, $pstart); # make sure it gets killed @@ -687,7 +728,9 @@ sub check_api2_permissions { sub init_request { my ($self, %params) = @_; - # implement in subclass + $self->{result_attributes} = {} + + # if you nedd more, implement in subclass } 1;