]> git.proxmox.com Git - pve-common.git/blob - src/PVE/RESTEnvironment.pm
bump version to 8.1.2
[pve-common.git] / src / PVE / RESTEnvironment.pm
1 package PVE::RESTEnvironment;
2
3 # NOTE: you can/should provide your own specialice class, and
4 # use this a bas class (as example see PVE::RPCEnvironment).
5
6 # we use this singleton class to pass RPC related environment values
7
8 use strict;
9 use warnings;
10
11 use Fcntl qw(:flock);
12 use IO::File;
13 use IO::Handle;
14 use IO::Select;
15 use POSIX qw(:sys_wait_h EINTR);
16
17 use PVE::Exception qw(raise raise_perm_exc);
18 use PVE::INotify;
19 use PVE::ProcFSTools;
20 use PVE::SafeSyslog;
21 use PVE::Tools;
22
23 my $rest_env;
24
25 # save $SIG{CHLD} handler implementation.
26 # simply set $SIG{CHLD} = $worker_reaper;
27 # and register forked processes with &$register_worker(pid)
28 # Note: using $SIG{CHLD} = 'IGNORE' or $SIG{CHLD} = sub { wait (); } or ...
29 # has serious side effects, because perls built in system() and open()
30 # functions can't get the correct exit status of a child. So we can't use
31 # that (also see perlipc)
32
33 my $WORKER_PIDS;
34 my $WORKER_FLAG = 0;
35
36 my $log_task_result = sub {
37 my ($upid, $user, $status) = @_;
38
39 return if !$rest_env;
40
41 my $msg = 'successful';
42 my $pri = 'info';
43 if ($status != 0) {
44 my $ec = $status >> 8;
45 my $ic = $status & 255;
46 $msg = $ec ? "failed ($ec)" : "interrupted ($ic)";
47 $pri = 'err';
48 }
49
50 my $tlist = $rest_env->active_workers($upid);
51 eval { $rest_env->broadcast_tasklist($tlist); };
52 syslog('err', $@) if $@;
53
54 my $task;
55 foreach my $t (@$tlist) {
56 if ($t->{upid} eq $upid) {
57 $task = $t;
58 last;
59 }
60 }
61 if ($task && $task->{status}) {
62 $msg = $task->{status};
63 }
64
65 $rest_env->log_cluster_msg($pri, $user, "end task $upid $msg");
66 };
67
68 my $worker_reaper = sub {
69 local $!; local $?;
70 foreach my $pid (keys %$WORKER_PIDS) {
71 my $waitpid = waitpid ($pid, WNOHANG);
72 if (defined($waitpid) && ($waitpid == $pid)) {
73 my $info = $WORKER_PIDS->{$pid};
74 if ($info && $info->{upid} && $info->{user}) {
75 &$log_task_result($info->{upid}, $info->{user}, $?);
76 }
77 delete ($WORKER_PIDS->{$pid});
78 }
79 }
80 };
81
82 my $register_worker = sub {
83 my ($pid, $user, $upid) = @_;
84
85 return if !$pid;
86
87 # do not register if already finished
88 my $waitpid = waitpid ($pid, WNOHANG);
89 if (defined($waitpid) && ($waitpid == $pid)) {
90 delete ($WORKER_PIDS->{$pid});
91 return;
92 }
93
94 $WORKER_PIDS->{$pid} = {
95 user => $user,
96 upid => $upid,
97 };
98 };
99
100 # initialize environment - must be called once at program startup
101 sub init {
102 my ($class, $type, %params) = @_;
103
104 $class = ref($class) || $class;
105
106 die "already initialized" if $rest_env;
107
108 die "unknown environment type"
109 if !$type || $type !~ m/^(cli|pub|priv|ha)$/;
110
111 $SIG{CHLD} = $worker_reaper;
112
113 # environment types
114 # cli ... command started fron command line
115 # pub ... access from public server (pveproxy)
116 # priv ... access from private server (pvedaemon)
117 # ha ... access from HA resource manager agent (pve-ha-manager)
118
119 my $self = {
120 type => $type,
121 warning_count => 0,
122 };
123
124 bless $self, $class;
125
126 foreach my $p (keys %params) {
127 if ($p eq 'atfork') {
128 $self->{$p} = $params{$p};
129 } else {
130 die "unknown option '$p'";
131 }
132 }
133
134 $rest_env = $self;
135
136 my ($sysname, $nodename) = POSIX::uname();
137
138 $nodename =~ s/\..*$//; # strip domain part, if any
139
140 $self->{nodename} = $nodename;
141
142 return $self;
143 };
144
145 # convenience function for command line tools
146 sub setup_default_cli_env {
147 my ($class, $username) = @_;
148
149 $class = ref($class) || $class;
150
151 $username //= 'root@pam';
152
153 PVE::INotify::inotify_init();
154
155 my $rpcenv = $class->init('cli');
156 $rpcenv->init_request();
157 $rpcenv->set_language($ENV{LANG});
158 $rpcenv->set_user($username);
159
160 die "please run as root\n"
161 if ($username eq 'root@pam') && ($> != 0);
162 }
163
164 # get the singleton
165 sub get {
166
167 die "REST environment not initialized" if !$rest_env;
168
169 return $rest_env;
170 }
171
172 sub set_client_ip {
173 my ($self, $ip) = @_;
174
175 $self->{client_ip} = $ip;
176 }
177
178 sub get_client_ip {
179 my ($self) = @_;
180
181 return $self->{client_ip};
182 }
183
184 sub set_result_attrib {
185 my ($self, $key, $value) = @_;
186
187 $self->{result_attributes}->{$key} = $value;
188 }
189
190 sub get_result_attrib {
191 my ($self, $key) = @_;
192
193 return $self->{result_attributes}->{$key};
194 }
195
196 sub set_language {
197 my ($self, $lang) = @_;
198
199 # fixme: initialize I18N
200
201 $self->{language} = $lang;
202 }
203
204 sub get_language {
205 my ($self) = @_;
206
207 return $self->{language};
208 }
209
210 sub set_user {
211 my ($self, $user) = @_;
212
213 $self->{user} = $user;
214 }
215
216 sub get_user {
217 my ($self, $noerr) = @_;
218
219 return $self->{user} if defined($self->{user}) || $noerr;
220
221 die "user name not set\n";
222 }
223
224 sub set_u2f_challenge {
225 my ($self, $challenge) = @_;
226
227 $self->{u2f_challenge} = $challenge;
228 }
229
230 sub get_u2f_challenge {
231 my ($self, $noerr) = @_;
232
233 return $self->{u2f_challenge} if defined($self->{u2f_challenge}) || $noerr;
234
235 die "no active u2f challenge\n";
236 }
237
238 sub set_request_host {
239 my ($self, $host) = @_;
240
241 $self->{request_host} = $host;
242 }
243
244 sub get_request_host {
245 my ($self, $noerr) = @_;
246
247 return $self->{request_host} if defined($self->{request_host}) || $noerr;
248
249 die "no hostname available in current environment\n";
250 }
251
252 sub is_worker {
253 my ($class) = @_;
254
255 return $WORKER_FLAG;
256 }
257
258 # read/update list of active workers
259 # we move all finished tasks to the archive index,
260 # but keep aktive and most recent task in the active file.
261 # $nocheck ... consider $new_upid still running (avoid that
262 # we try to read the reult to early.
263 sub active_workers {
264 my ($self, $new_upid, $nocheck) = @_;
265
266 my $lkfn = "/var/log/pve/tasks/.active.lock";
267
268 my $timeout = 10;
269
270 my $code = sub {
271
272 my $tasklist = PVE::INotify::read_file('active');
273
274 my @ta;
275 my $tlist = [];
276 my $thash = {}; # only list task once
277
278 my $check_task = sub {
279 my ($task, $running) = @_;
280
281 if ($running || PVE::ProcFSTools::check_process_running($task->{pid}, $task->{pstart})) {
282 push @$tlist, $task;
283 } else {
284 delete $task->{pid};
285 push @ta, $task;
286 }
287 delete $task->{pstart};
288 };
289
290 foreach my $task (@$tasklist) {
291 my $upid = $task->{upid};
292 next if $thash->{$upid};
293 $thash->{$upid} = $task;
294 &$check_task($task);
295 }
296
297 if ($new_upid && !(my $task = $thash->{$new_upid})) {
298 $task = PVE::Tools::upid_decode($new_upid);
299 $task->{upid} = $new_upid;
300 $thash->{$new_upid} = $task;
301 &$check_task($task, $nocheck);
302 }
303
304
305 @ta = sort { $b->{starttime} <=> $a->{starttime} } @ta;
306
307 my $save = defined($new_upid);
308
309 foreach my $task (@ta) {
310 next if $task->{endtime};
311 $task->{endtime} = time();
312 $task->{status} = PVE::Tools::upid_read_status($task->{upid});
313 $save = 1;
314 }
315
316 my $archive = '';
317 my @arlist = ();
318 foreach my $task (@ta) {
319 if (!$task->{saved}) {
320 $archive .= sprintf("%s %08X %s\n", $task->{upid}, $task->{endtime}, $task->{status});
321 $save = 1;
322 push @arlist, $task;
323 $task->{saved} = 1;
324 }
325 }
326
327 if ($archive) {
328 my $size = 0;
329 my $filename = "/var/log/pve/tasks/index";
330 eval {
331 my $fh = IO::File->new($filename, '>>', 0644) ||
332 die "unable to open file '$filename' - $!\n";
333 PVE::Tools::safe_print($filename, $fh, $archive);
334 $size = -s $fh;
335 close($fh) ||
336 die "unable to close file '$filename' - $!\n";
337 };
338 my $err = $@;
339 if ($err) {
340 syslog('err', $err);
341 foreach my $task (@arlist) { # mark as not saved
342 $task->{saved} = 0;
343 }
344 }
345 my $maxsize = 50000; # about 1000 entries
346 if ($size > $maxsize) {
347 rename($filename, "$filename.1");
348 }
349 }
350
351 # we try to reduce the amount of data
352 # list all running tasks and task and a few others
353 # try to limit to 25 tasks
354 my $max = 25 - scalar(@$tlist);
355 foreach my $task (@ta) {
356 last if $max <= 0;
357 push @$tlist, $task;
358 $max--;
359 }
360
361 PVE::INotify::write_file('active', $tlist) if $save;
362
363 return $tlist;
364 };
365
366 my $res = PVE::Tools::lock_file($lkfn, $timeout, $code);
367 die $@ if $@;
368
369 return $res;
370 }
371
372 my $kill_process_group = sub {
373 my ($pid, $pstart) = @_;
374
375 # send kill to process group (negative pid)
376 my $kpid = -$pid;
377
378 # always send signal to all pgrp members
379 kill(15, $kpid); # send TERM signal
380
381 # give max 5 seconds to shut down
382 for (my $i = 0; $i < 5; $i++) {
383 return if !PVE::ProcFSTools::check_process_running($pid, $pstart);
384 sleep (1);
385 }
386
387 # to be sure
388 kill(9, $kpid);
389 };
390
391 sub check_worker {
392 my ($self, $upid, $killit) = @_;
393
394 my $task = PVE::Tools::upid_decode($upid);
395
396 my $running = PVE::ProcFSTools::check_process_running($task->{pid}, $task->{pstart});
397
398 return 0 if !$running;
399
400 if ($killit) {
401 &$kill_process_group($task->{pid});
402 return 0;
403 }
404
405 return 1;
406 }
407
408 # acts almost as tee: writes an output both to STDOUT and a task log,
409 # we differ as we're worker aware and look also at the childs control pipe,
410 # so we know if the function could be executed successfully or not.
411 my $tee_worker = sub {
412 my ($childfd, $ctrlfd, $taskfh, $cpid) = @_;
413
414 eval {
415 my $int_count = 0;
416 local $SIG{INT} = local $SIG{QUIT} = local $SIG{TERM} = sub {
417 # always send signal to all pgrp members
418 my $kpid = -$cpid;
419 if ($int_count < 3) {
420 kill(15, $kpid); # send TERM signal
421 } else {
422 kill(9, $kpid); # send KILL signal
423 }
424 $int_count++;
425 };
426 local $SIG{PIPE} = sub { die "broken pipe\n"; };
427
428 my $select = new IO::Select;
429 my $fh = IO::Handle->new_from_fd($childfd, 'r');
430 $select->add($fh);
431
432 my $readbuf = '';
433 my $count;
434 while ($select->count) {
435 my @handles = $select->can_read(1);
436 if (scalar(@handles)) {
437 my $count = sysread ($handles[0], $readbuf, 4096);
438 if (!defined ($count)) {
439 my $err = $!;
440 die "sync pipe read error: $err\n";
441 }
442 last if $count == 0; # eof
443
444 print $readbuf;
445 select->flush();
446
447 print $taskfh $readbuf;
448 $taskfh->flush();
449 } else {
450 # some commands daemonize without closing stdout
451 last if !PVE::ProcFSTools::check_process_running($cpid);
452 }
453 }
454
455 POSIX::read($ctrlfd, $readbuf, 4096);
456 if ($readbuf =~ m/^TASK OK\n?$/) {
457 # skip printing to stdout
458 print $taskfh $readbuf;
459 } elsif ($readbuf =~ m/^TASK ERROR: (.*)\n?$/) {
460 print STDERR "$1\n";
461 print $taskfh "\n$readbuf"; # ensure start on new line for webUI
462 } elsif ($readbuf =~ m/^TASK WARNINGS: (\d+)\n?$/) {
463 print STDERR "Task finished with $1 warning(s)!\n";
464 print $taskfh "\n$readbuf"; # ensure start on new line for webUI
465 } else {
466 die "got unexpected control message: $readbuf\n";
467 }
468 $taskfh->flush();
469 };
470 my $err = $@;
471
472 POSIX::close($childfd);
473 POSIX::close($ctrlfd);
474
475 if ($err) {
476 $err =~ s/\n/ /mg;
477 print STDERR "$err\n";
478 print $taskfh "TASK ERROR: $err\n";
479 }
480 };
481
482 # start long running workers
483 # STDIN is redirected to /dev/null
484 # STDOUT,STDERR are redirected to the filename returned by upid_decode
485 # NOTE: we simulate running in foreground if ($self->{type} eq 'cli')
486 sub fork_worker {
487 my ($self, $dtype, $id, $user, $function, $background) = @_;
488
489 $dtype = 'unknown' if !defined ($dtype);
490 $id = '' if !defined ($id);
491
492 $user = 'root@pve' if !defined ($user);
493
494 my $sync = ($self->{type} eq 'cli' && !$background) ? 1 : 0;
495
496 local $SIG{INT} =
497 local $SIG{QUIT} =
498 local $SIG{PIPE} =
499 local $SIG{TERM} = 'IGNORE';
500
501 my $starttime = time ();
502
503 my @psync = POSIX::pipe();
504 my @csync = POSIX::pipe();
505 my @ctrlfd = POSIX::pipe() if $sync;
506
507 my $node = $self->{nodename};
508
509 my $cpid = fork();
510 die "unable to fork worker - $!" if !defined($cpid);
511
512 my $workerpuid = $cpid ? $cpid : $$;
513
514 my $pstart = PVE::ProcFSTools::read_proc_starttime($workerpuid) ||
515 die "unable to read process start time";
516
517 my $upid = PVE::Tools::upid_encode ({
518 node => $node, pid => $workerpuid, pstart => $pstart,
519 starttime => $starttime, type => $dtype, id => $id, user => $user });
520
521 my $outfh;
522
523 if (!$cpid) { # child
524
525 $0 = "task $upid";
526 $WORKER_FLAG = 1;
527
528 $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { die "received interrupt\n"; };
529
530 $SIG{CHLD} = $SIG{PIPE} = 'DEFAULT';
531 $SIG{TTOU} = 'IGNORE';
532
533 my $ppgid;
534 # set session/process group allows to kill the process group
535 if ($sync && -t STDIN) {
536 # some sync'ed workers operate on the tty but setsid sessions lose
537 # the tty, so just create a new pgroup and give it the tty
538 $ppgid = POSIX::getpgrp() or die "failed to get old pgid: $!\n";
539 POSIX::setpgid(0, 0) or die "failed to setpgid: $!\n";
540 POSIX::tcsetpgrp(fileno(STDIN), $$) or die "failed to tcsetpgrp: $!\n";
541 } else {
542 POSIX::setsid();
543 }
544
545 POSIX::close ($psync[0]);
546 POSIX::close ($ctrlfd[0]) if $sync;
547 POSIX::close ($csync[1]);
548
549 $outfh = $sync ? $psync[1] : undef;
550 my $resfh = $sync ? $ctrlfd[1] : undef;
551
552 eval {
553 PVE::INotify::inotify_close();
554
555 if (my $atfork = $self->{atfork}) {
556 &$atfork();
557 }
558
559 # same algorithm as used inside SA
560 # STDIN = /dev/null
561 my $fd = fileno (STDIN);
562
563 if (!$sync) {
564 close STDIN;
565 POSIX::close(0) if $fd != 0;
566
567 die "unable to redirect STDIN - $!"
568 if !open(STDIN, "</dev/null");
569
570 $outfh = PVE::Tools::upid_open($upid);
571 $resfh = fileno($outfh);
572 }
573
574
575 # redirect STDOUT
576 $fd = fileno(STDOUT);
577 close STDOUT;
578 POSIX::close (1) if $fd != 1;
579
580 die "unable to redirect STDOUT - $!"
581 if !open(STDOUT, ">&", $outfh);
582
583 STDOUT->autoflush (1);
584
585 # redirect STDERR to STDOUT
586 $fd = fileno (STDERR);
587 close STDERR;
588 POSIX::close(2) if $fd != 2;
589
590 die "unable to redirect STDERR - $!"
591 if !open(STDERR, ">&1");
592
593 STDERR->autoflush(1);
594 };
595 if (my $err = $@) {
596 my $msg = "ERROR: $err";
597 POSIX::write($psync[1], $msg, length ($msg));
598 POSIX::close($psync[1]);
599 POSIX::_exit(1);
600 kill(-9, $$);
601 }
602
603 # sync with parent (signal that we are ready)
604 POSIX::write($psync[1], $upid, length ($upid));
605 POSIX::close($psync[1]) if !$sync; # don't need output pipe if async
606
607 eval {
608 my $readbuf = '';
609 # sync with parent (wait until parent is ready)
610 POSIX::read($csync[0], $readbuf, 4096);
611 die "parent setup error\n" if $readbuf ne 'OK';
612
613 if ($self->{type} eq 'ha') {
614 print "task started by HA resource agent\n";
615 }
616 &$function($upid);
617 };
618 my ($msg, $exitcode);
619 my $err = $@;
620 if ($err) {
621 chomp $err;
622 $err =~ s/\n/ /mg;
623 syslog('err', $err);
624 $msg = "TASK ERROR: $err\n";
625 $exitcode = -1;
626 } elsif (my $warnings = $self->{warning_count}) {
627 $msg = "TASK WARNINGS: $warnings\n";
628 $exitcode = 0;
629 } else {
630 $msg = "TASK OK\n";
631 $exitcode = 0;
632 }
633 POSIX::write($resfh, $msg, length($msg));
634
635 if ($sync) {
636 POSIX::close($resfh);
637 if ( -t STDIN) {
638 POSIX::tcsetpgrp(fileno(STDIN), $ppgid) or
639 die "failed to tcsetpgrp to parent: $!\n";
640 }
641 }
642 POSIX::_exit($exitcode);
643 kill(-9, $$); # not really needed, just to be sure
644 }
645
646 # parent
647
648 POSIX::close ($psync[1]);
649 POSIX::close ($ctrlfd[1]) if $sync;
650 POSIX::close ($csync[0]);
651
652 my $readbuf = '';
653 # sync with child (wait until child starts)
654 POSIX::read($psync[0], $readbuf, 4096);
655
656 if (!$sync) {
657 POSIX::close($psync[0]);
658 &$register_worker($cpid, $user, $upid);
659 } else {
660 chomp $readbuf;
661 }
662
663 eval {
664 die "got no worker upid - start worker failed\n" if !$readbuf;
665
666 if ($readbuf =~ m/^ERROR:\s*(.+)$/m) {
667 die "starting worker failed: $1\n";
668 }
669
670 if ($readbuf ne $upid) {
671 die "got strange worker upid ('$readbuf' != '$upid') - start worker failed\n";
672 }
673
674 if ($sync) {
675 $outfh = PVE::Tools::upid_open($upid);
676 }
677 };
678 my $err = $@;
679
680 if (!$err) {
681 my $msg = 'OK';
682 POSIX::write($csync[1], $msg, length ($msg));
683 POSIX::close($csync[1]);
684
685 } else {
686 POSIX::close($csync[1]);
687 kill(-9, $cpid); # make sure it gets killed
688 die $err;
689 }
690
691 $self->log_cluster_msg('info', $user, "starting task $upid");
692
693 my $tlist = $self->active_workers($upid, $sync);
694 eval { $self->broadcast_tasklist($tlist); };
695 syslog('err', $@) if $@;
696
697 my $res = 0;
698
699 if ($sync) {
700
701 $tee_worker->($psync[0], $ctrlfd[0], $outfh, $cpid);
702
703 &$kill_process_group($cpid, $pstart); # make sure it gets killed
704
705 close($outfh);
706
707 waitpid($cpid, 0);
708 $res = $?;
709 &$log_task_result($upid, $user, $res);
710 }
711
712 return wantarray ? ($upid, $res) : $upid;
713 }
714
715 sub warn {
716 my ($self, $message) = @_;
717
718 chomp($message);
719
720 print STDERR "WARN: $message\n";
721
722 $self->{warning_count}++;
723 }
724
725 # Abstract function
726
727 sub log_cluster_msg {
728 my ($self, $pri, $user, $msg) = @_;
729
730 syslog($pri, "%s", $msg);
731
732 # PVE::Cluster::log_msg($pri, $user, $msg);
733 }
734
735 sub broadcast_tasklist {
736 my ($self, $tlist) = @_;
737
738 # PVE::Cluster::broadcast_tasklist($tlist);
739 }
740
741 sub check_api2_permissions {
742 my ($self, $perm, $username, $param) = @_;
743
744 return 1 if !$username && $perm->{user} eq 'world';
745
746 raise_perm_exc("user != null") if !$username;
747
748 return 1 if $username eq 'root@pam';
749
750 raise_perm_exc('user != root@pam') if !$perm;
751
752 return 1 if $perm->{user} && $perm->{user} eq 'all';
753
754 ##return $self->exec_api2_perm_check($perm->{check}, $username, $param)
755 ##if $perm->{check};
756
757 raise_perm_exc();
758 }
759
760 # init_request - should be called before each REST/CLI request
761 sub init_request {
762 my ($self, %params) = @_;
763
764 $self->{result_attributes} = {}
765
766 # if you nedd more, implement in subclass
767 }
768
769 1;