use strict;
use warnings;
+use Data::Dumper;
+use POSIX qw(:sys_wait_h);
use PVE::SafeSyslog;
use PVE::Tools;
my $self = bless {
haenv => $haenv,
status => { state => 'startup' },
+ workers => {},
+ results => {},
}, $class;
$self->set_local_status({ state => 'wait_for_agent_lock' });
# do work
+ $self->{service_status} = {};
+
if ($state eq 'wait_for_agent_lock') {
return 0 if $self->{shutdown_request};
$shutdown = 1;
}
+ } else {
+ my $ms = $haenv->read_manager_status();
+
+ $self->{service_status} = $ms->{service_status} || {};
+
+ $self->manage_resources();
}
};
if (my $err = $@) {
return 1;
}
+sub manage_resources {
+ my ($self) = @_;
+
+ my $haenv = $self->{haenv};
+
+ my $nodename = $haenv->nodename();
+
+ my $ms = $haenv->read_manager_status();
+
+ my $ss = $self->{service_status};
+
+ foreach my $sid (keys %$ss) {
+ my $sd = $ss->{$sid};
+ next if !$sd->{node};
+ next if !$sd->{uid};
+ next if $sd->{node} ne $nodename;
+ my $req_state = $sd->{state};
+ next if !defined($req_state);
+
+ eval {
+ $self->queue_resource_command($sid, $sd->{uid}, $req_state);
+ };
+ if (my $err = $@) {
+ warn "unable to run resource agent for '$sid' - $err"; # fixme
+ }
+ }
+
+ my $starttime = time();
+
+ # start workers
+ my $max_workers = 4;
+
+ while ((time() - $starttime) < 5) {
+ my $count = $self->check_active_workers();
+
+ foreach my $sid (keys %{$self->{workers}}) {
+ last if $count >= $max_workers;
+ my $w = $self->{workers}->{$sid};
+ if (!$w->{pid}) {
+ my $pid = fork();
+ if (!defined($pid)) {
+ warn "fork worker failed\n";
+ $count = 0; last; # abort, try later
+ } elsif ($pid == 0) {
+ # do work
+ my $res = -1;
+ eval {
+ $res = $haenv->exec_resource_agent($sid, $w->{state});
+ };
+ if (my $err = $@) {
+ warn $err;
+ POSIX::_exit(-1);
+ }
+ POSIX::_exit($res);
+ } else {
+ $count++;
+ $w->{pid} = $pid;
+ }
+ }
+ }
+
+ last if !$count;
+
+ sleep(1);
+ }
+}
+
+# fixme: use a queue an limit number of parallel workers?
+sub queue_resource_command {
+ my ($self, $sid, $uid, $state) = @_;
+
+ if (my $w = $self->{workers}->{$sid}) {
+ return if $w->{pid}; # already started
+ # else, delete and overwrite queue entry with new command
+ delete $self->{workers}->{$sid};
+ }
+
+ $self->{workers}->{$sid} = {
+ sid => $sid,
+ uid => $uid,
+ state => $state,
+ };
+}
+
+sub check_active_workers {
+ my ($self) = @_;
+
+ # finish/count workers
+ my $count = 0;
+ foreach my $sid (keys %{$self->{workers}}) {
+ my $w = $self->{workers}->{$sid};
+ if (my $pid = $w->{pid}) {
+ # check status
+ my $waitpid = waitpid($pid, WNOHANG);
+ if (defined($waitpid) && ($waitpid == $pid)) {
+ $self->resource_command_finished($sid, $w->{uid}, $?);
+ } else {
+ $count++;
+ }
+ }
+ }
+
+ return $count;
+}
+
+sub resource_command_finished {
+ my ($self, $sid, $uid, $status) = @_;
+
+ my $haenv = $self->{haenv};
+
+ my $w = delete $self->{workers}->{$sid};
+ return if !$w; # should not happen
+
+ my $exit_code = -1;
+
+ if ($status == -1) {
+ print "RA $sid finished - failed to execute\n";
+ } elsif (my $sig = ($status & 127)) {
+ print "RA $sid finished - got signal $sig\n";
+ } else {
+ $exit_code = ($status >> 8);
+ print "RA $sid finished - exit code ${exit_code}\n";
+ }
+
+ $self->{results}->{$uid} = {
+ sid => $w->{sid},
+ state => $w->{state},
+ exit_code => $exit_code,
+ };
+
+ my $ss = $self->{service_status};
+
+ # compute hash of valid/existing uids
+ my $valid_uids = {};
+ foreach my $sid (keys %$ss) {
+ my $sd = $ss->{$sid};
+ next if !$sd->{uid};
+ $valid_uids->{$sd->{uid}} = 1;
+ }
+
+ my $results = {};
+ foreach my $id (keys %{$self->{results}}) {
+ next if !$valid_uids->{$id};
+ $results->{$id} = $self->{results}->{$id};
+ }
+ $self->{results} = $results;
+
+ $haenv->write_lrm_status($results);
+}
+
1;