]> git.proxmox.com Git - pve-manager.git/blob - PVE/Service/pvescheduler.pm
pvescheduler: reworking child pid tracking
[pve-manager.git] / PVE / Service / pvescheduler.pm
1 package PVE::Service::pvescheduler;
2
3 use strict;
4 use warnings;
5
6 use POSIX qw(WNOHANG);
7
8 use PVE::Jobs;
9 use PVE::SafeSyslog;
10
11 use PVE::API2::Replication;
12
13 use PVE::Daemon;
14 use base qw(PVE::Daemon);
15
16 my $cmdline = [$0, @ARGV];
17 my %daemon_options = (stop_wait_time => 180, max_workers => 0);
18 my $daemon = __PACKAGE__->new('pvescheduler', $cmdline, %daemon_options);
19
20 my @types = qw(replication jobs);
21
22 my $finish_jobs = sub {
23 my ($self) = @_;
24 for my $type (@types) {
25 if (my $cpid = $self->{jobs}->{$type}) {
26 my $waitpid = waitpid($cpid, WNOHANG);
27 if (defined($waitpid) && ($waitpid == $cpid)) {
28 $self->{jobs}->{$type} = undef;
29 }
30 }
31 }
32 };
33
34 sub run {
35 my ($self) = @_;
36
37 my $jobs = {};
38 $self->{jobs} = $jobs;
39
40 my $old_sig_chld = $SIG{CHLD};
41 local $SIG{CHLD} = sub {
42 local ($@, $!, $?); # do not overwrite error vars
43 $finish_jobs->($self);
44 $old_sig_chld->(@_) if $old_sig_chld;
45 };
46
47 my $fork = sub {
48 my ($type, $sub) = @_;
49
50 # don't fork again if the previous iteration still runs
51 return if defined($self->{jobs}->{$type});
52
53 my $child = fork();
54 if (!defined($child)) {
55 die "fork failed: $!\n";
56 } elsif ($child == 0) {
57 $self->after_fork_cleanup();
58 eval {
59 $sub->();
60 };
61 if (my $err = $@) {
62 syslog('err', "ERROR: $err");
63 }
64 POSIX::_exit(0);
65 }
66
67 $jobs->{$type} = $child;
68 };
69
70 my $run_jobs = sub {
71
72 $fork->('replication', sub {
73 PVE::API2::Replication::run_jobs(undef, sub {}, 0, 1);
74 });
75
76 $fork->('jobs', sub {
77 PVE::Jobs::run_jobs();
78 });
79 };
80
81 PVE::Jobs::setup_dirs();
82
83 for (my $count = 1000;;$count++) {
84 last if $self->{shutdown_request};
85
86 $run_jobs->();
87
88 my $sleep_time = 60;
89 if ($count >= 1000) {
90 # Job schedule has minute precision, so try running near the minute boundary.
91 my ($current_seconds) = localtime;
92 $sleep_time = (60 - $current_seconds) if (60 - $current_seconds >= 5);
93 $count = 0;
94 }
95
96 my $slept = 0; # SIGCHLD interrupts sleep, so we need to keep track
97 while ($slept < $sleep_time) {
98 last if $self->{shutdown_request};
99 $slept += sleep($sleep_time - $slept);
100 }
101 }
102
103 # replication jobs have a lock timeout of 60s, wait a bit more for graceful termination
104 my $timeout = 0;
105 for my $type (@types) {
106 while (defined($jobs->{$type}) && $timeout < 75) {
107 kill 'TERM', $jobs->{$type};
108 $timeout += sleep(5);
109 }
110 # ensure the rest gets stopped
111 kill 'KILL', $jobs->{$type} if defined($jobs->{$type});
112 }
113 }
114
115 sub shutdown {
116 my ($self) = @_;
117
118 syslog('info', 'got shutdown request, signal running jobs to stop');
119
120 for my $type (@types) {
121 kill 'TERM', $self->{jobs}->{$type} if $self->{jobs}->{$type};
122 }
123 $self->{shutdown_request} = 1;
124 }
125
126 $daemon->register_start_command();
127 $daemon->register_stop_command();
128 $daemon->register_status_command();
129
130 our $cmddef = {
131 start => [ __PACKAGE__, 'start', []],
132 stop => [ __PACKAGE__, 'stop', []],
133 status => [ __PACKAGE__, 'status', [], undef, sub { print shift . "\n";} ],
134 };
135
136 1;