]> git.proxmox.com Git - pve-common.git/blobdiff - src/PVE/RESTEnvironment.pm
bump version to 8.2.1
[pve-common.git] / src / PVE / RESTEnvironment.pm
index 0ad6dbaefc28c96067cae96901ebb845be570c9c..191c6ebf6f62250b47c7b4aee163d2adeb847685 100644 (file)
@@ -7,17 +7,22 @@ package PVE::RESTEnvironment;
 
 use strict;
 use warnings;
-use POSIX qw(:sys_wait_h EINTR);
-use IO::Handle;
+
+use Exporter qw(import);
+use Fcntl qw(:flock);
 use IO::File;
+use IO::Handle;
 use IO::Select;
-use Fcntl qw(:flock);
+use POSIX qw(:sys_wait_h EINTR);
+use AnyEvent;
+
 use PVE::Exception qw(raise raise_perm_exc);
-use PVE::SafeSyslog;
-use PVE::Tools;
 use PVE::INotify;
 use PVE::ProcFSTools;
+use PVE::SafeSyslog;
+use PVE::Tools;
 
+our @EXPORT_OK = qw(log_warn);
 
 my $rest_env;
 
@@ -26,7 +31,7 @@ my $rest_env;
 # and register forked processes with &$register_worker(pid)
 # Note: using $SIG{CHLD} = 'IGNORE' or $SIG{CHLD} = sub { wait (); } or ...
 # has serious side effects, because perls built in system() and open()
-# functions can't get the correct exit status of a child. So we cant use
+# functions can't get the correct exit status of a child. So we can't use
 # that (also see perlipc)
 
 my $WORKER_PIDS;
@@ -107,15 +112,26 @@ sub init {
     die "unknown environment type"
        if !$type || $type !~ m/^(cli|pub|priv|ha)$/;
 
-    $SIG{CHLD} = $worker_reaper;
+    $SIG{CHLD} = sub {
+       # when we're using AnyEvent, we have to postpone the call to worker_reaper, otherwise it
+       # might interfere with running api calls
+       if (defined($AnyEvent::MODEL)) {
+           AnyEvent::postpone { $worker_reaper->() };
+       } else {
+           $worker_reaper->();
+       }
+    };
 
     # environment types
     # cli  ... command started fron command line
-    # pub  ... access from public server (apache)
+    # pub  ... access from public server (pveproxy)
     # priv ... access from private server (pvedaemon)
-    # ha   ... access from HA resource manager agent (rgmanager)
+    # ha   ... access from HA resource manager agent (pve-ha-manager)
 
-    my $self = { type => $type };
+    my $self = {
+       type => $type,
+       warning_count => 0,
+    };
 
     bless $self, $class;
 
@@ -217,26 +233,51 @@ sub get_user {
     die "user name not set\n";
 }
 
+sub set_u2f_challenge {
+    my ($self, $challenge) = @_;
+
+    $self->{u2f_challenge} = $challenge;
+}
+
+sub get_u2f_challenge {
+    my ($self, $noerr) = @_;
+
+    return $self->{u2f_challenge} if defined($self->{u2f_challenge}) || $noerr;
+
+    die "no active u2f challenge\n";
+}
+
+sub set_request_host {
+    my ($self, $host) = @_;
+
+    $self->{request_host} = $host;
+}
+
+sub get_request_host {
+    my ($self, $noerr) = @_;
+
+    return $self->{request_host} if defined($self->{request_host}) || $noerr;
+
+    die "no hostname available in current environment\n";
+}
+
 sub is_worker {
     my ($class) = @_;
 
     return $WORKER_FLAG;
 }
 
-# read/update list of active workers
-# we move all finished tasks to the archive index,
-# but keep aktive and most recent task in the active file.
-# $nocheck ... consider $new_upid still running (avoid that
-# we try to read the reult to early.
-sub active_workers  {
+# read/update list of active workers.
+#
+# we move all finished tasks to the archive index, but keep active, and most recent tasks in the
+# active file.
+# $nocheck ... consider $new_upid still running (avoid that we try to read the result to early).
+sub active_workers {
     my ($self, $new_upid, $nocheck) = @_;
 
-    my $lkfn = "/var/log/pve/tasks/.active.lock";
-
     my $timeout = 10;
 
-    my $code = sub {
-
+    my $res = PVE::Tools::lock_file("/var/log/pve/tasks/.active.lock", $timeout, sub {
        my $tasklist = PVE::INotify::read_file('active');
 
        my @ta;
@@ -262,15 +303,15 @@ sub active_workers  {
            &$check_task($task);
        }
 
-       if ($new_upid && !(my $task = $thash->{$new_upid})) {
-           $task = PVE::Tools::upid_decode($new_upid);
+       if ($new_upid && !$thash->{$new_upid}) {
+           my $task = PVE::Tools::upid_decode($new_upid);
            $task->{upid} = $new_upid;
            $thash->{$new_upid} = $task;
            &$check_task($task, $nocheck);
        }
 
 
-       @ta = sort { $b->{starttime} cmp $a->{starttime} } @ta;
+       @ta = sort { $b->{starttime} <=> $a->{starttime} } @ta;
 
        my $save = defined($new_upid);
 
@@ -316,10 +357,9 @@ sub active_workers  {
            }
        }
 
-       # we try to reduce the amount of data
-       # list all running tasks and task and a few others
-       # try to limit to 25 tasks
-       my $max = 25 - scalar(@$tlist);
+       # we try to reduce the amount of data list all running tasks and task and a few others
+       my $MAX_FINISHED = 25;
+       my $max = $MAX_FINISHED - scalar(@$tlist);
         foreach my $task (@ta) {
            last if $max <= 0;
            push @$tlist, $task;
@@ -329,9 +369,7 @@ sub active_workers  {
        PVE::INotify::write_file('active', $tlist) if $save;
 
        return $tlist;
-    };
-
-    my $res = PVE::Tools::lock_file($lkfn, $timeout, $code);
+    });
     die $@ if $@;
 
     return $res;
@@ -373,6 +411,80 @@ sub check_worker {
     return 1;
 }
 
+# acts almost as tee: writes an output both to STDOUT and a task log,
+# we differ as we're worker aware and look also at the childs control pipe,
+# so we know if the function could be executed successfully or not.
+my $tee_worker = sub {
+    my ($childfd, $ctrlfd, $taskfh, $cpid) = @_;
+
+    eval {
+       my $int_count = 0;
+       local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
+           # always send signal to all pgrp members
+           my $kpid = -$cpid;
+           if ($int_count < 3) {
+               kill(15, $kpid); # send TERM signal
+           } else {
+               kill(9, $kpid); # send KILL signal
+           }
+           $int_count++;
+       };
+       local $SIG{PIPE} = sub { die "broken pipe\n"; };
+
+       my $select = IO::Select->new();
+       my $fh = IO::Handle->new_from_fd($childfd, 'r');
+       $select->add($fh);
+
+       my $readbuf = '';
+       my $count;
+       while ($select->count) {
+           my @handles = $select->can_read(1);
+           if (scalar(@handles)) {
+               my $count = sysread ($handles[0], $readbuf, 4096);
+               if (!defined ($count)) {
+                   my $err = $!;
+                   die "sync pipe read error: $err\n";
+               }
+               last if $count == 0; # eof
+
+               print $readbuf;
+               select->flush();
+
+               print $taskfh $readbuf;
+               $taskfh->flush();
+           } else {
+               # some commands daemonize without closing stdout
+               last if !PVE::ProcFSTools::check_process_running($cpid);
+           }
+       }
+
+       POSIX::read($ctrlfd, $readbuf, 4096);
+       if ($readbuf =~ m/^TASK OK\n?$/) {
+           # skip printing to stdout
+           print $taskfh $readbuf;
+       } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) {
+           print STDERR "$1\n";
+           print $taskfh "\n$readbuf"; # ensure start on new line for webUI
+       } elsif ($readbuf =~ m/^TASK WARNINGS: (\d+)\n?$/) {
+           print STDERR "Task finished with $1 warning(s)!\n";
+           print $taskfh "\n$readbuf"; # ensure start on new line for webUI
+       } else {
+           die "got unexpected control message: $readbuf\n";
+       }
+       $taskfh->flush();
+    };
+    my $err = $@;
+
+    POSIX::close($childfd);
+    POSIX::close($ctrlfd);
+
+    if ($err) {
+       $err =~ s/\n/ /mg;
+       print STDERR "$err\n";
+       print $taskfh "TASK ERROR: $err\n";
+    }
+};
+
 # start long running workers
 # STDIN is redirected to /dev/null
 # STDOUT,STDERR are redirected to the filename returned by upid_decode
@@ -383,7 +495,8 @@ sub fork_worker {
     $dtype = 'unknown' if !defined ($dtype);
     $id = '' if !defined ($id);
 
-    $user = 'root@pve' if !defined ($user);
+    # note: below is only used for the task log entry
+    $user = $self->get_user(1) // 'root@pam' if !defined($user);
 
     my $sync = ($self->{type} eq 'cli' && !$background) ? 1 : 0;
 
@@ -396,6 +509,7 @@ sub fork_worker {
 
     my @psync = POSIX::pipe();
     my @csync = POSIX::pipe();
+    my @ctrlfd = $sync ? POSIX::pipe() : ();
 
     my $node = $self->{nodename};
 
@@ -421,15 +535,26 @@ sub fork_worker {
        $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { die "received interrupt\n"; };
 
        $SIG{CHLD} = $SIG{PIPE} = 'DEFAULT';
-
-       # set sess/process group - we want to be able to kill the
-       # whole process group
-       POSIX::setsid();
+       $SIG{TTOU} = 'IGNORE';
+
+       my $ppgid;
+       # set session/process group allows to kill the process group
+       if ($sync && -t STDIN) {
+           # some sync'ed workers operate on the tty but setsid sessions lose
+           # the tty, so just create a new pgroup and give it the tty
+           $ppgid = POSIX::getpgrp() or die "failed to get old pgid: $!\n";
+           POSIX::setpgid(0, 0) or die "failed to setpgid: $!\n";
+           POSIX::tcsetpgrp(fileno(STDIN), $$) or die "failed to tcsetpgrp: $!\n";
+       } else {
+           POSIX::setsid();
+       }
 
        POSIX::close ($psync[0]);
+       POSIX::close ($ctrlfd[0]) if $sync;
        POSIX::close ($csync[1]);
 
        $outfh = $sync ? $psync[1] : undef;
+       my $resfh = $sync ? $ctrlfd[1] : undef;
 
        eval {
            PVE::INotify::inotify_close();
@@ -438,7 +563,7 @@ sub fork_worker {
                &$atfork();
            }
 
-           # same algorythm as used inside SA
+           # same algorithm as used inside SA
            # STDIN = /dev/null
            my $fd = fileno (STDIN);
 
@@ -446,10 +571,10 @@ sub fork_worker {
                close STDIN;
                POSIX::close(0) if $fd != 0;
 
-               die "unable to redirect STDIN - $!"
-                   if !open(STDIN, "</dev/null");
+               open(STDIN, '<', '/dev/null') or die "unable to redirect STDIN - $!";
 
                $outfh = PVE::Tools::upid_open($upid);
+               $resfh = fileno($outfh);
            }
 
 
@@ -458,8 +583,7 @@ sub fork_worker {
            close STDOUT;
            POSIX::close (1) if $fd != 1;
 
-           die "unable to redirect STDOUT - $!"
-               if !open(STDOUT, ">&", $outfh);
+           open(STDOUT, ">&", $outfh) or die "unable to redirect STDOUT - $!";
 
            STDOUT->autoflush (1);
 
@@ -468,8 +592,7 @@ sub fork_worker {
            close STDERR;
            POSIX::close(2) if $fd != 2;
 
-           die "unable to redirect STDERR - $!"
-               if !open(STDERR, ">&1");
+           open(STDERR, '>&', '1') or die "unable to redirect STDERR - $!";
 
            STDERR->autoflush(1);
        };
@@ -485,32 +608,49 @@ sub fork_worker {
        POSIX::write($psync[1], $upid, length ($upid));
        POSIX::close($psync[1]) if !$sync; # don't need output pipe if async
 
-       my $readbuf = '';
-       # sync with parent (wait until parent is ready)
-       POSIX::read($csync[0], $readbuf, 4096);
-       die "parent setup error\n" if $readbuf ne 'OK';
+       eval {
+           my $readbuf = '';
+           # sync with parent (wait until parent is ready)
+           POSIX::read($csync[0], $readbuf, 4096);
+           die "parent setup error\n" if $readbuf ne 'OK';
 
-       if ($self->{type} eq 'ha') {
-           print "task started by HA resource agent\n";
-       }
-       eval { &$function($upid); };
+           if ($self->{type} eq 'ha') {
+               print "task started by HA resource agent\n";
+           }
+           &$function($upid);
+       };
+       my ($msg, $exitcode);
        my $err = $@;
        if ($err) {
            chomp $err;
            $err =~ s/\n/ /mg;
            syslog('err', $err);
-           print STDERR "TASK ERROR: $err\n";
-           POSIX::_exit(-1);
+           $msg = "TASK ERROR: $err\n";
+           $exitcode = -1;
+       } elsif (my $warnings = $self->{warning_count}) {
+           $msg = "TASK WARNINGS: $warnings\n";
+           $exitcode = 0;
        } else {
-           print STDERR "TASK OK\n";
-           POSIX::_exit(0);
+           $msg = "TASK OK\n";
+           $exitcode = 0;
        }
-       kill(-9, $$);
+       POSIX::write($resfh, $msg, length($msg));
+
+       if ($sync) {
+           POSIX::close($resfh);
+           if ( -t STDIN) {
+               POSIX::tcsetpgrp(fileno(STDIN), $ppgid) or
+                   die "failed to tcsetpgrp to parent: $!\n";
+           }
+       }
+       POSIX::_exit($exitcode);
+       kill(-9, $$); # not really needed, just to be sure
     }
 
     # parent
 
     POSIX::close ($psync[1]);
+    POSIX::close ($ctrlfd[1]) if $sync;
     POSIX::close ($csync[0]);
 
     my $readbuf = '';
@@ -561,76 +701,8 @@ sub fork_worker {
     my $res = 0;
 
     if ($sync) {
-       my $count;
-       my $outbuf = '';
-       my $int_count = 0;
-       eval {
-           local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
-               # always send signal to all pgrp members
-               my $kpid = -$cpid;
-               if ($int_count < 3) {
-                   kill(15, $kpid); # send TERM signal
-               } else {
-                   kill(9, $kpid); # send KILL signal
-               }
-               $int_count++;
-           };
-           local $SIG{PIPE} = sub { die "broken pipe\n"; };
-
-           my $select = new IO::Select;
-           my $fh = IO::Handle->new_from_fd($psync[0], 'r');
-           $select->add($fh);
-
-           while ($select->count) {
-               my @handles = $select->can_read(1);
-               if (scalar(@handles)) {
-                   my $count = sysread ($handles[0], $readbuf, 4096);
-                   if (!defined ($count)) {
-                       my $err = $!;
-                       die "sync pipe read error: $err\n";
-                   }
-                   last if $count == 0; # eof
-
-                   $outbuf .= $readbuf;
-                   while ($outbuf =~ s/^(([^\010\r\n]*)(\r|\n|(\010)+|\r\n))//s) {
-                       my $line = $1;
-                       my $data = $2;
-                       if ($data =~ m/^TASK OK$/) {
-                           # skip
-                       } elsif ($data =~ m/^TASK ERROR: (.+)$/) {
-                           print STDERR "$1\n";
-                       } else {
-                           print $line;
-                       }
-                       if ($outfh) {
-                           print $outfh $line;
-                           $outfh->flush();
-                       }
-                   }
-               } else {
-                   # some commands daemonize without closing stdout
-                   last if !PVE::ProcFSTools::check_process_running($cpid);
-               }
-           }
-       };
-       my $err = $@;
 
-       POSIX::close($psync[0]);
-
-       if ($outbuf) { # just to be sure
-           print $outbuf;
-           if ($outfh) {
-               print $outfh $outbuf;
-           }
-       }
-
-       if ($err) {
-           $err =~ s/\n/ /mg;
-           print STDERR "$err\n";
-           if ($outfh) {
-               print $outfh "TASK ERROR: $err\n";
-           }
-       }
+       $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid);
 
        &$kill_process_group($cpid, $pstart); # make sure it gets killed
 
@@ -644,6 +716,27 @@ sub fork_worker {
     return wantarray ? ($upid, $res) : $upid;
 }
 
+sub log_warn {
+    my ($message) = @_;
+
+    if ($rest_env) {
+       $rest_env->warn($message);
+    } else {
+       chomp($message);
+       print STDERR "WARN: $message\n";
+    }
+}
+
+sub warn {
+    my ($self, $message) = @_;
+
+    chomp($message);
+
+    print STDERR "WARN: $message\n";
+
+    $self->{warning_count}++;
+}
+
 # Abstract function
 
 sub log_cluster_msg {