]> git.proxmox.com Git - pve-common.git/blob - src/PVE/RESTEnvironment.pm
bump version to 8.2.1
[pve-common.git] / src / PVE / RESTEnvironment.pm
1 package PVE::RESTEnvironment;
2
3 # NOTE: you can/should provide your own specialice class, and
4 # use this a bas class (as example see PVE::RPCEnvironment).
5
6 # we use this singleton class to pass RPC related environment values
7
8 use strict;
9 use warnings;
10
11 use Exporter qw(import);
12 use Fcntl qw(:flock);
13 use IO::File;
14 use IO::Handle;
15 use IO::Select;
16 use POSIX qw(:sys_wait_h EINTR);
17 use AnyEvent;
18
19 use PVE::Exception qw(raise raise_perm_exc);
20 use PVE::INotify;
21 use PVE::ProcFSTools;
22 use PVE::SafeSyslog;
23 use PVE::Tools;
24
25 our @EXPORT_OK = qw(log_warn);
26
27 my $rest_env;
28
29 # save $SIG{CHLD} handler implementation.
30 # simply set $SIG{CHLD} = $worker_reaper;
31 # and register forked processes with &$register_worker(pid)
32 # Note: using $SIG{CHLD} = 'IGNORE' or $SIG{CHLD} = sub { wait (); } or ...
33 # has serious side effects, because perls built in system() and open()
34 # functions can't get the correct exit status of a child. So we can't use
35 # that (also see perlipc)
36
37 my $WORKER_PIDS;
38 my $WORKER_FLAG = 0;
39
40 my $log_task_result = sub {
41 my ($upid, $user, $status) = @_;
42
43 return if !$rest_env;
44
45 my $msg = 'successful';
46 my $pri = 'info';
47 if ($status != 0) {
48 my $ec = $status >> 8;
49 my $ic = $status & 255;
50 $msg = $ec ? "failed ($ec)" : "interrupted ($ic)";
51 $pri = 'err';
52 }
53
54 my $tlist = $rest_env->active_workers($upid);
55 eval { $rest_env->broadcast_tasklist($tlist); };
56 syslog('err', $@) if $@;
57
58 my $task;
59 foreach my $t (@$tlist) {
60 if ($t->{upid} eq $upid) {
61 $task = $t;
62 last;
63 }
64 }
65 if ($task && $task->{status}) {
66 $msg = $task->{status};
67 }
68
69 $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg");
70 };
71
72 my $worker_reaper = sub {
73 local $!; local $?;
74 foreach my $pid (keys %$WORKER_PIDS) {
75 my $waitpid = waitpid ($pid, WNOHANG);
76 if (defined($waitpid) && ($waitpid == $pid)) {
77 my $info = $WORKER_PIDS->{$pid};
78 if ($info && $info->{upid} && $info->{user}) {
79 &$log_task_result($info->{upid}, $info->{user}, $?);
80 }
81 delete ($WORKER_PIDS->{$pid});
82 }
83 }
84 };
85
86 my $register_worker = sub {
87 my ($pid, $user, $upid) = @_;
88
89 return if !$pid;
90
91 # do not register if already finished
92 my $waitpid = waitpid ($pid, WNOHANG);
93 if (defined($waitpid) && ($waitpid == $pid)) {
94 delete ($WORKER_PIDS->{$pid});
95 return;
96 }
97
98 $WORKER_PIDS->{$pid} = {
99 user => $user,
100 upid => $upid,
101 };
102 };
103
104 # initialize environment - must be called once at program startup
105 sub init {
106 my ($class, $type, %params) = @_;
107
108 $class = ref($class) || $class;
109
110 die "already initialized" if $rest_env;
111
112 die "unknown environment type"
113 if !$type || $type !~ m/^(cli|pub|priv|ha)$/;
114
115 $SIG{CHLD} = sub {
116 # when we're using AnyEvent, we have to postpone the call to worker_reaper, otherwise it
117 # might interfere with running api calls
118 if (defined($AnyEvent::MODEL)) {
119 AnyEvent::postpone { $worker_reaper->() };
120 } else {
121 $worker_reaper->();
122 }
123 };
124
125 # environment types
126 # cli ... command started fron command line
127 # pub ... access from public server (pveproxy)
128 # priv ... access from private server (pvedaemon)
129 # ha ... access from HA resource manager agent (pve-ha-manager)
130
131 my $self = {
132 type => $type,
133 warning_count => 0,
134 };
135
136 bless $self, $class;
137
138 foreach my $p (keys %params) {
139 if ($p eq 'atfork') {
140 $self->{$p} = $params{$p};
141 } else {
142 die "unknown option '$p'";
143 }
144 }
145
146 $rest_env = $self;
147
148 my ($sysname, $nodename) = POSIX::uname();
149
150 $nodename =~ s/\..*$//; # strip domain part, if any
151
152 $self->{nodename} = $nodename;
153
154 return $self;
155 };
156
157 # convenience function for command line tools
158 sub setup_default_cli_env {
159 my ($class, $username) = @_;
160
161 $class = ref($class) || $class;
162
163 $username //= 'root@pam';
164
165 PVE::INotify::inotify_init();
166
167 my $rpcenv = $class->init('cli');
168 $rpcenv->init_request();
169 $rpcenv->set_language($ENV{LANG});
170 $rpcenv->set_user($username);
171
172 die "please run as root\n"
173 if ($username eq 'root@pam') && ($> != 0);
174 }
175
176 # get the singleton
177 sub get {
178
179 die "REST environment not initialized" if !$rest_env;
180
181 return $rest_env;
182 }
183
184 sub set_client_ip {
185 my ($self, $ip) = @_;
186
187 $self->{client_ip} = $ip;
188 }
189
190 sub get_client_ip {
191 my ($self) = @_;
192
193 return $self->{client_ip};
194 }
195
196 sub set_result_attrib {
197 my ($self, $key, $value) = @_;
198
199 $self->{result_attributes}->{$key} = $value;
200 }
201
202 sub get_result_attrib {
203 my ($self, $key) = @_;
204
205 return $self->{result_attributes}->{$key};
206 }
207
208 sub set_language {
209 my ($self, $lang) = @_;
210
211 # fixme: initialize I18N
212
213 $self->{language} = $lang;
214 }
215
216 sub get_language {
217 my ($self) = @_;
218
219 return $self->{language};
220 }
221
222 sub set_user {
223 my ($self, $user) = @_;
224
225 $self->{user} = $user;
226 }
227
228 sub get_user {
229 my ($self, $noerr) = @_;
230
231 return $self->{user} if defined($self->{user}) || $noerr;
232
233 die "user name not set\n";
234 }
235
236 sub set_u2f_challenge {
237 my ($self, $challenge) = @_;
238
239 $self->{u2f_challenge} = $challenge;
240 }
241
242 sub get_u2f_challenge {
243 my ($self, $noerr) = @_;
244
245 return $self->{u2f_challenge} if defined($self->{u2f_challenge}) || $noerr;
246
247 die "no active u2f challenge\n";
248 }
249
250 sub set_request_host {
251 my ($self, $host) = @_;
252
253 $self->{request_host} = $host;
254 }
255
256 sub get_request_host {
257 my ($self, $noerr) = @_;
258
259 return $self->{request_host} if defined($self->{request_host}) || $noerr;
260
261 die "no hostname available in current environment\n";
262 }
263
264 sub is_worker {
265 my ($class) = @_;
266
267 return $WORKER_FLAG;
268 }
269
270 # read/update list of active workers.
271 #
272 # we move all finished tasks to the archive index, but keep active, and most recent tasks in the
273 # active file.
274 # $nocheck ... consider $new_upid still running (avoid that we try to read the result to early).
275 sub active_workers {
276 my ($self, $new_upid, $nocheck) = @_;
277
278 my $timeout = 10;
279
280 my $res = PVE::Tools::lock_file("/var/log/pve/tasks/.active.lock", $timeout, sub {
281 my $tasklist = PVE::INotify::read_file('active');
282
283 my @ta;
284 my $tlist = [];
285 my $thash = {}; # only list task once
286
287 my $check_task = sub {
288 my ($task, $running) = @_;
289
290 if ($running || PVE::ProcFSTools::check_process_running($task->{pid}, $task->{pstart})) {
291 push @$tlist, $task;
292 } else {
293 delete $task->{pid};
294 push @ta, $task;
295 }
296 delete $task->{pstart};
297 };
298
299 foreach my $task (@$tasklist) {
300 my $upid = $task->{upid};
301 next if $thash->{$upid};
302 $thash->{$upid} = $task;
303 &$check_task($task);
304 }
305
306 if ($new_upid && !$thash->{$new_upid}) {
307 my $task = PVE::Tools::upid_decode($new_upid);
308 $task->{upid} = $new_upid;
309 $thash->{$new_upid} = $task;
310 &$check_task($task, $nocheck);
311 }
312
313
314 @ta = sort { $b->{starttime} <=> $a->{starttime} } @ta;
315
316 my $save = defined($new_upid);
317
318 foreach my $task (@ta) {
319 next if $task->{endtime};
320 $task->{endtime} = time();
321 $task->{status} = PVE::Tools::upid_read_status($task->{upid});
322 $save = 1;
323 }
324
325 my $archive = '';
326 my @arlist = ();
327 foreach my $task (@ta) {
328 if (!$task->{saved}) {
329 $archive .= sprintf("%s %08X %s\n", $task->{upid}, $task->{endtime}, $task->{status});
330 $save = 1;
331 push @arlist, $task;
332 $task->{saved} = 1;
333 }
334 }
335
336 if ($archive) {
337 my $size = 0;
338 my $filename = "/var/log/pve/tasks/index";
339 eval {
340 my $fh = IO::File->new($filename, '>>', 0644) ||
341 die "unable to open file '$filename' - $!\n";
342 PVE::Tools::safe_print($filename, $fh, $archive);
343 $size = -s $fh;
344 close($fh) ||
345 die "unable to close file '$filename' - $!\n";
346 };
347 my $err = $@;
348 if ($err) {
349 syslog('err', $err);
350 foreach my $task (@arlist) { # mark as not saved
351 $task->{saved} = 0;
352 }
353 }
354 my $maxsize = 50000; # about 1000 entries
355 if ($size > $maxsize) {
356 rename($filename, "$filename.1");
357 }
358 }
359
360 # we try to reduce the amount of data list all running tasks and task and a few others
361 my $MAX_FINISHED = 25;
362 my $max = $MAX_FINISHED - scalar(@$tlist);
363 foreach my $task (@ta) {
364 last if $max <= 0;
365 push @$tlist, $task;
366 $max--;
367 }
368
369 PVE::INotify::write_file('active', $tlist) if $save;
370
371 return $tlist;
372 });
373 die $@ if $@;
374
375 return $res;
376 }
377
378 my $kill_process_group = sub {
379 my ($pid, $pstart) = @_;
380
381 # send kill to process group (negative pid)
382 my $kpid = -$pid;
383
384 # always send signal to all pgrp members
385 kill(15, $kpid); # send TERM signal
386
387 # give max 5 seconds to shut down
388 for (my $i = 0; $i < 5; $i++) {
389 return if !PVE::ProcFSTools::check_process_running($pid, $pstart);
390 sleep (1);
391 }
392
393 # to be sure
394 kill(9, $kpid);
395 };
396
397 sub check_worker {
398 my ($self, $upid, $killit) = @_;
399
400 my $task = PVE::Tools::upid_decode($upid);
401
402 my $running = PVE::ProcFSTools::check_process_running($task->{pid}, $task->{pstart});
403
404 return 0 if !$running;
405
406 if ($killit) {
407 &$kill_process_group($task->{pid});
408 return 0;
409 }
410
411 return 1;
412 }
413
414 # acts almost as tee: writes an output both to STDOUT and a task log,
415 # we differ as we're worker aware and look also at the childs control pipe,
416 # so we know if the function could be executed successfully or not.
417 my $tee_worker = sub {
418 my ($childfd, $ctrlfd, $taskfh, $cpid) = @_;
419
420 eval {
421 my $int_count = 0;
422 local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
423 # always send signal to all pgrp members
424 my $kpid = -$cpid;
425 if ($int_count < 3) {
426 kill(15, $kpid); # send TERM signal
427 } else {
428 kill(9, $kpid); # send KILL signal
429 }
430 $int_count++;
431 };
432 local $SIG{PIPE} = sub { die "broken pipe\n"; };
433
434 my $select = IO::Select->new();
435 my $fh = IO::Handle->new_from_fd($childfd, 'r');
436 $select->add($fh);
437
438 my $readbuf = '';
439 my $count;
440 while ($select->count) {
441 my @handles = $select->can_read(1);
442 if (scalar(@handles)) {
443 my $count = sysread ($handles[0], $readbuf, 4096);
444 if (!defined ($count)) {
445 my $err = $!;
446 die "sync pipe read error: $err\n";
447 }
448 last if $count == 0; # eof
449
450 print $readbuf;
451 select->flush();
452
453 print $taskfh $readbuf;
454 $taskfh->flush();
455 } else {
456 # some commands daemonize without closing stdout
457 last if !PVE::ProcFSTools::check_process_running($cpid);
458 }
459 }
460
461 POSIX::read($ctrlfd, $readbuf, 4096);
462 if ($readbuf =~ m/^TASK OK\n?$/) {
463 # skip printing to stdout
464 print $taskfh $readbuf;
465 } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) {
466 print STDERR "$1\n";
467 print $taskfh "\n$readbuf"; # ensure start on new line for webUI
468 } elsif ($readbuf =~ m/^TASK WARNINGS: (\d+)\n?$/) {
469 print STDERR "Task finished with $1 warning(s)!\n";
470 print $taskfh "\n$readbuf"; # ensure start on new line for webUI
471 } else {
472 die "got unexpected control message: $readbuf\n";
473 }
474 $taskfh->flush();
475 };
476 my $err = $@;
477
478 POSIX::close($childfd);
479 POSIX::close($ctrlfd);
480
481 if ($err) {
482 $err =~ s/\n/ /mg;
483 print STDERR "$err\n";
484 print $taskfh "TASK ERROR: $err\n";
485 }
486 };
487
488 # start long running workers
489 # STDIN is redirected to /dev/null
490 # STDOUT,STDERR are redirected to the filename returned by upid_decode
491 # NOTE: we simulate running in foreground if ($self->{type} eq 'cli')
492 sub fork_worker {
493 my ($self, $dtype, $id, $user, $function, $background) = @_;
494
495 $dtype = 'unknown' if !defined ($dtype);
496 $id = '' if !defined ($id);
497
498 # note: below is only used for the task log entry
499 $user = $self->get_user(1) // 'root@pam' if !defined($user);
500
501 my $sync = ($self->{type} eq 'cli' && !$background) ? 1 : 0;
502
503 local $SIG{INT} =
504 local $SIG{QUIT} =
505 local $SIG{PIPE} =
506 local $SIG{TERM} = 'IGNORE';
507
508 my $starttime = time ();
509
510 my @psync = POSIX::pipe();
511 my @csync = POSIX::pipe();
512 my @ctrlfd = $sync ? POSIX::pipe() : ();
513
514 my $node = $self->{nodename};
515
516 my $cpid = fork();
517 die "unable to fork worker - $!" if !defined($cpid);
518
519 my $workerpuid = $cpid ? $cpid : $$;
520
521 my $pstart = PVE::ProcFSTools::read_proc_starttime($workerpuid) ||
522 die "unable to read process start time";
523
524 my $upid = PVE::Tools::upid_encode ({
525 node => $node, pid => $workerpuid, pstart => $pstart,
526 starttime => $starttime, type => $dtype, id => $id, user => $user });
527
528 my $outfh;
529
530 if (!$cpid) { # child
531
532 $0 = "task $upid";
533 $WORKER_FLAG = 1;
534
535 $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { die "received interrupt\n"; };
536
537 $SIG{CHLD} = $SIG{PIPE} = 'DEFAULT';
538 $SIG{TTOU} = 'IGNORE';
539
540 my $ppgid;
541 # set session/process group allows to kill the process group
542 if ($sync && -t STDIN) {
543 # some sync'ed workers operate on the tty but setsid sessions lose
544 # the tty, so just create a new pgroup and give it the tty
545 $ppgid = POSIX::getpgrp() or die "failed to get old pgid: $!\n";
546 POSIX::setpgid(0, 0) or die "failed to setpgid: $!\n";
547 POSIX::tcsetpgrp(fileno(STDIN), $$) or die "failed to tcsetpgrp: $!\n";
548 } else {
549 POSIX::setsid();
550 }
551
552 POSIX::close ($psync[0]);
553 POSIX::close ($ctrlfd[0]) if $sync;
554 POSIX::close ($csync[1]);
555
556 $outfh = $sync ? $psync[1] : undef;
557 my $resfh = $sync ? $ctrlfd[1] : undef;
558
559 eval {
560 PVE::INotify::inotify_close();
561
562 if (my $atfork = $self->{atfork}) {
563 &$atfork();
564 }
565
566 # same algorithm as used inside SA
567 # STDIN = /dev/null
568 my $fd = fileno (STDIN);
569
570 if (!$sync) {
571 close STDIN;
572 POSIX::close(0) if $fd != 0;
573
574 open(STDIN, '<', '/dev/null') or die "unable to redirect STDIN - $!";
575
576 $outfh = PVE::Tools::upid_open($upid);
577 $resfh = fileno($outfh);
578 }
579
580
581 # redirect STDOUT
582 $fd = fileno(STDOUT);
583 close STDOUT;
584 POSIX::close (1) if $fd != 1;
585
586 open(STDOUT, ">&", $outfh) or die "unable to redirect STDOUT - $!";
587
588 STDOUT->autoflush (1);
589
590 # redirect STDERR to STDOUT
591 $fd = fileno (STDERR);
592 close STDERR;
593 POSIX::close(2) if $fd != 2;
594
595 open(STDERR, '>&', '1') or die "unable to redirect STDERR - $!";
596
597 STDERR->autoflush(1);
598 };
599 if (my $err = $@) {
600 my $msg = "ERROR: $err";
601 POSIX::write($psync[1], $msg, length ($msg));
602 POSIX::close($psync[1]);
603 POSIX::_exit(1);
604 kill(-9, $$);
605 }
606
607 # sync with parent (signal that we are ready)
608 POSIX::write($psync[1], $upid, length ($upid));
609 POSIX::close($psync[1]) if !$sync; # don't need output pipe if async
610
611 eval {
612 my $readbuf = '';
613 # sync with parent (wait until parent is ready)
614 POSIX::read($csync[0], $readbuf, 4096);
615 die "parent setup error\n" if $readbuf ne 'OK';
616
617 if ($self->{type} eq 'ha') {
618 print "task started by HA resource agent\n";
619 }
620 &$function($upid);
621 };
622 my ($msg, $exitcode);
623 my $err = $@;
624 if ($err) {
625 chomp $err;
626 $err =~ s/\n/ /mg;
627 syslog('err', $err);
628 $msg = "TASK ERROR: $err\n";
629 $exitcode = -1;
630 } elsif (my $warnings = $self->{warning_count}) {
631 $msg = "TASK WARNINGS: $warnings\n";
632 $exitcode = 0;
633 } else {
634 $msg = "TASK OK\n";
635 $exitcode = 0;
636 }
637 POSIX::write($resfh, $msg, length($msg));
638
639 if ($sync) {
640 POSIX::close($resfh);
641 if ( -t STDIN) {
642 POSIX::tcsetpgrp(fileno(STDIN), $ppgid) or
643 die "failed to tcsetpgrp to parent: $!\n";
644 }
645 }
646 POSIX::_exit($exitcode);
647 kill(-9, $$); # not really needed, just to be sure
648 }
649
650 # parent
651
652 POSIX::close ($psync[1]);
653 POSIX::close ($ctrlfd[1]) if $sync;
654 POSIX::close ($csync[0]);
655
656 my $readbuf = '';
657 # sync with child (wait until child starts)
658 POSIX::read($psync[0], $readbuf, 4096);
659
660 if (!$sync) {
661 POSIX::close($psync[0]);
662 &$register_worker($cpid, $user, $upid);
663 } else {
664 chomp $readbuf;
665 }
666
667 eval {
668 die "got no worker upid - start worker failed\n" if !$readbuf;
669
670 if ($readbuf =~ m/^ERROR:\s*(.+)$/m) {
671 die "starting worker failed: $1\n";
672 }
673
674 if ($readbuf ne $upid) {
675 die "got strange worker upid ('$readbuf' != '$upid') - start worker failed\n";
676 }
677
678 if ($sync) {
679 $outfh = PVE::Tools::upid_open($upid);
680 }
681 };
682 my $err = $@;
683
684 if (!$err) {
685 my $msg = 'OK';
686 POSIX::write($csync[1], $msg, length ($msg));
687 POSIX::close($csync[1]);
688
689 } else {
690 POSIX::close($csync[1]);
691 kill(-9, $cpid); # make sure it gets killed
692 die $err;
693 }
694
695 $self->log_cluster_msg('info', $user, "starting task $upid");
696
697 my $tlist = $self->active_workers($upid, $sync);
698 eval { $self->broadcast_tasklist($tlist); };
699 syslog('err', $@) if $@;
700
701 my $res = 0;
702
703 if ($sync) {
704
705 $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid);
706
707 &$kill_process_group($cpid, $pstart); # make sure it gets killed
708
709 close($outfh);
710
711 waitpid($cpid, 0);
712 $res = $?;
713 &$log_task_result($upid, $user, $res);
714 }
715
716 return wantarray ? ($upid, $res) : $upid;
717 }
718
719 sub log_warn {
720 my ($message) = @_;
721
722 if ($rest_env) {
723 $rest_env->warn($message);
724 } else {
725 chomp($message);
726 print STDERR "WARN: $message\n";
727 }
728 }
729
730 sub warn {
731 my ($self, $message) = @_;
732
733 chomp($message);
734
735 print STDERR "WARN: $message\n";
736
737 $self->{warning_count}++;
738 }
739
740 # Abstract function
741
742 sub log_cluster_msg {
743 my ($self, $pri, $user, $msg) = @_;
744
745 syslog($pri, "%s", $msg);
746
747 # PVE::Cluster::log_msg($pri, $user, $msg);
748 }
749
750 sub broadcast_tasklist {
751 my ($self, $tlist) = @_;
752
753 # PVE::Cluster::broadcast_tasklist($tlist);
754 }
755
756 sub check_api2_permissions {
757 my ($self, $perm, $username, $param) = @_;
758
759 return 1 if !$username && $perm->{user} eq 'world';
760
761 raise_perm_exc("user != null") if !$username;
762
763 return 1 if $username eq 'root@pam';
764
765 raise_perm_exc('user != root@pam') if !$perm;
766
767 return 1 if $perm->{user} && $perm->{user} eq 'all';
768
769 ##return $self->exec_api2_perm_check($perm->{check}, $username, $param)
770 ##if $perm->{check};
771
772 raise_perm_exc();
773 }
774
775 # init_request - should be called before each REST/CLI request
776 sub init_request {
777 my ($self, %params) = @_;
778
779 $self->{result_attributes} = {}
780
781 # if you nedd more, implement in subclass
782 }
783
784 1;