return $self->{plug}->write_manager_status($status_obj);
}
+# lrm status is written by LRM, protected by ha_agent_lock,
+# but can be read by any node (CRM)
+
+sub read_lrm_status {
+ my ($self, $node) = @_;
+
+ return $self->{plug}->read_lrm_status($node);
+}
+
+sub write_lrm_status {
+ my ($self, $status_obj) = @_;
+
+ return $self->{plug}->write_lrm_status($status_obj);
+}
+
# we use this to enable/disbale ha
sub manager_status_exists {
my ($self) = @_;
return $self->{plug}->watchdog_close($wfh);
}
+sub exec_resource_agent {
+ my ($self, $sid, $cmd, @params) = @_;
+
+ return $self->{plug}->exec_resource_agent($sid, $cmd, @params)
+}
+
1;
PVE::HA::Tools::write_json_to_file($filename, $status_obj);
}
+sub read_lrm_status {
+ my ($self, $node) = @_;
+
+ $node = $self->{nodename} if !defined($node);
+
+ my $filename = "/etc/pve/nodes/$node/lrm_status";
+
+ return PVE::HA::Tools::read_json_from_file($filename, {});
+}
+
+sub write_lrm_status {
+ my ($self, $status_obj) = @_;
+
+ $node = $self->{nodename};
+
+ my $filename = "/etc/pve/nodes/$node/lrm_status";
+
+ PVE::HA::Tools::write_json_to_file($filename, $status_obj);
+}
+
sub manager_status_exists {
my ($self) = @_;
die "implement me";
}
+sub exec_resource_agent {
+ my ($self, $sid, $cmd, @params) = @_;
+
+ die "implement me";
+}
+
1;
use strict;
use warnings;
+use Data::Dumper;
+use POSIX qw(:sys_wait_h);
use PVE::SafeSyslog;
use PVE::Tools;
my $self = bless {
haenv => $haenv,
status => { state => 'startup' },
+ workers => {},
+ results => {},
}, $class;
$self->set_local_status({ state => 'wait_for_agent_lock' });
# do work
+ $self->{service_status} = {};
+
if ($state eq 'wait_for_agent_lock') {
return 0 if $self->{shutdown_request};
$shutdown = 1;
}
+ } else {
+ my $ms = $haenv->read_manager_status();
+
+ $self->{service_status} = $ms->{service_status} || {};
+
+ $self->manage_resources();
}
};
if (my $err = $@) {
return 1;
}
+sub manage_resources {
+ my ($self) = @_;
+
+ my $haenv = $self->{haenv};
+
+ my $nodename = $haenv->nodename();
+
+ my $ms = $haenv->read_manager_status();
+
+ my $ss = $self->{service_status};
+
+ foreach my $sid (keys %$ss) {
+ my $sd = $ss->{$sid};
+ next if !$sd->{node};
+ next if !$sd->{uid};
+ next if $sd->{node} ne $nodename;
+ my $req_state = $sd->{state};
+ next if !defined($req_state);
+
+ eval {
+ $self->queue_resource_command($sid, $sd->{uid}, $req_state);
+ };
+ if (my $err = $@) {
+ warn "unable to run resource agent for '$sid' - $err"; # fixme
+ }
+ }
+
+ my $starttime = time();
+
+ # start workers
+ my $max_workers = 4;
+
+ while ((time() - $starttime) < 5) {
+ my $count = $self->check_active_workers();
+
+ foreach my $sid (keys %{$self->{workers}}) {
+ last if $count >= $max_workers;
+ my $w = $self->{workers}->{$sid};
+ if (!$w->{pid}) {
+ my $pid = fork();
+ if (!defined($pid)) {
+ warn "fork worker failed\n";
+ $count = 0; last; # abort, try later
+ } elsif ($pid == 0) {
+ # do work
+ my $res = -1;
+ eval {
+ $res = $haenv->exec_resource_agent($sid, $w->{state});
+ };
+ if (my $err = $@) {
+ warn $err;
+ POSIX::_exit(-1);
+ }
+ POSIX::_exit($res);
+ } else {
+ $count++;
+ $w->{pid} = $pid;
+ }
+ }
+ }
+
+ last if !$count;
+
+ sleep(1);
+ }
+}
+
+# fixme: use a queue an limit number of parallel workers?
+sub queue_resource_command {
+ my ($self, $sid, $uid, $state) = @_;
+
+ if (my $w = $self->{workers}->{$sid}) {
+ return if $w->{pid}; # already started
+ # else, delete and overwrite queue entry with new command
+ delete $self->{workers}->{$sid};
+ }
+
+ $self->{workers}->{$sid} = {
+ sid => $sid,
+ uid => $uid,
+ state => $state,
+ };
+}
+
+sub check_active_workers {
+ my ($self) = @_;
+
+ # finish/count workers
+ my $count = 0;
+ foreach my $sid (keys %{$self->{workers}}) {
+ my $w = $self->{workers}->{$sid};
+ if (my $pid = $w->{pid}) {
+ # check status
+ my $waitpid = waitpid($pid, WNOHANG);
+ if (defined($waitpid) && ($waitpid == $pid)) {
+ $self->resource_command_finished($sid, $w->{uid}, $?);
+ } else {
+ $count++;
+ }
+ }
+ }
+
+ return $count;
+}
+
+sub resource_command_finished {
+ my ($self, $sid, $uid, $status) = @_;
+
+ my $haenv = $self->{haenv};
+
+ my $w = delete $self->{workers}->{$sid};
+ return if !$w; # should not happen
+
+ my $exit_code = -1;
+
+ if ($status == -1) {
+ print "RA $sid finished - failed to execute\n";
+ } elsif (my $sig = ($status & 127)) {
+ print "RA $sid finished - got signal $sig\n";
+ } else {
+ $exit_code = ($status >> 8);
+ print "RA $sid finished - exit code ${exit_code}\n";
+ }
+
+ $self->{results}->{$uid} = {
+ sid => $w->{sid},
+ state => $w->{state},
+ exit_code => $exit_code,
+ };
+
+ my $ss = $self->{service_status};
+
+ # compute hash of valid/existing uids
+ my $valid_uids = {};
+ foreach my $sid (keys %$ss) {
+ my $sd = $ss->{$sid};
+ next if !$sd->{uid};
+ $valid_uids->{$sd->{uid}} = 1;
+ }
+
+ my $results = {};
+ foreach my $id (keys %{$self->{results}}) {
+ next if !$valid_uids->{$id};
+ $results->{$id} = $self->{results}->{$id};
+ }
+ $self->{results} = $results;
+
+ $haenv->write_lrm_status($results);
+}
+
1;
use strict;
use warnings;
+use Digest::MD5 qw(md5_base64);
use Data::Dumper;
return shift @$online_nodes;
}
+my $uid_counter = 0;
+
my $change_service_state = sub {
my ($self, $sid, $new_state, %params) = @_;
}
$sd->{state} = $new_state;
+ $uid_counter++;
+ $sd->{uid} = md5_base64($new_state . $$ . time() . $uid_counter);
# fixme: cleanup state (remove unused values)
$haenv->log('info', "service '$sid': state changed to '$new_state' $changes\n");
};
+# read LRM status for all nodes (even for offline nodes)
+sub read_lrm_status {
+ my ($self, $node_info) = @_;
+
+ my $haenv = $self->{haenv};
+
+ my $res = {};
+
+ foreach my $node (keys %$node_info) {
+ my $ls = $haenv->read_lrm_status($node);
+ foreach my $uid (keys %$ls) {
+ next if $res->{$uid}; # should not happen
+ $res->{$uid} = $ls->{$uid};
+ }
+ }
+
+ return $res;
+}
+
sub manage {
my ($self) = @_;
my ($haenv, $ms, $ns, $ss) = ($self->{haenv}, $self->{ms}, $self->{ns}, $self->{ss});
- $ns->update($haenv->get_node_info());
+ my ($node_info, $quorate) = $haenv->get_node_info();
+ $ns->update($node_info);
+
+ # fixme: what if $quorate is 0??
if (!$ns->node_is_online($haenv->nodename())) {
$haenv->log('info', "master seems offline\n");
return;
}
+ my $lrm_status = $self->read_lrm_status($node_info);
+
my $sc = $haenv->read_service_config();
# compute new service status
return -f $filename ? 1 : 0;
}
+sub read_lrm_status {
+ my ($self, $node) = @_;
+
+ $node = $self->{nodename} if !defined($node);
+
+ return $self->{hardware}->read_lrm_status($node);
+}
+
+sub write_lrm_status {
+ my ($self, $status_obj) = @_;
+
+ my $node = $self->{nodename};
+
+ return $self->{hardware}->write_lrm_status($node, $status_obj);
+}
+
sub read_service_config {
my ($self) = @_;
return $self->{hardware}->watchdog_close($wfh);
}
+sub exec_resource_agent {
+ my ($self, $sid, $cmd, @params) = @_;
+
+ die "implement me";
+}
+
1;
# $testdir/hardware_status Hardware description (number of nodes, ...)
# $testdir/manager_status CRM status (start with {})
# $testdir/service_config Service configuration
+# $testdir/service_status Service status
#
# runtime status for simulation system
#
# runtime status
#
-# $testdir/status/local_status_<node> local CRM Daemon status
+# $testdir/status/lrm_status_<node> LRM status
# $testdir/status/manager_status CRM status
# $testdir/status/service_config Service configuration
+# $testdir/status/service_status Service status
+
+sub read_lrm_status {
+ my ($self, $node) = @_;
+
+ my $filename = "$self->{statusdir}/lrm_status_$node";
+
+ return PVE::HA::Tools::read_json_from_file($filename, {});
+}
+
+sub write_lrm_status {
+ my ($self, $node, $status_obj) = @_;
+
+ my $filename = "$self->{statusdir}/lrm_status_$node";
+
+ PVE::HA::Tools::write_json_to_file($filename, $status_obj);
+}
sub read_hardware_status_nolock {
my ($self) = @_;
return PVE::HA::Tools::write_json_to_file($filename, $conf);
}
+sub read_service_status {
+ my ($self) = @_;
+
+ my $filename = "$self->{statusdir}/service_status";
+ return PVE::HA::Tools::read_json_from_file($filename);
+}
+
+sub write_service_status {
+ my ($self, $data) = @_;
+
+ my $filename = "$self->{statusdir}/service_status";
+ return PVE::HA::Tools::write_json_to_file($filename, $data);
+}
+
sub new {
my ($this, $testdir) = @_;
$self->write_service_config($conf);
}
+ if (-f "$testdir/service_status") {
+ copy("$testdir/service_status", "$statusdir/service_status");
+ } else {
+ $self->write_service_status({});
+ }
+
if (-f "$testdir/hardware_status") {
copy("$testdir/hardware_status", "$statusdir/hardware_status") ||
die "Copy failed: $!\n";
die "loop take too long ($delay seconds)\n" if $delay > 30;
}
+sub exec_resource_agent {
+ my ($self, $sid, $cmd, @params) = @_;
+
+ my $hardware = $self->{hardware};
+
+ my $ss = $hardware->read_service_status();
+
+ if ($cmd eq 'request_stop') {
+
+ if (!$ss->{$sid}) {
+ print "WORKER status $sid: stopped\n";
+ return 0;
+ } else {
+ print "WORKER status $sid: running\n";
+ return 1;
+ }
+
+ } elsif ($cmd eq 'start') {
+
+ if ($ss->{$sid}) {
+ print "WORKER status $sid: running\n";
+ return 0;
+ }
+ print "START WORKER $sid\n";
+
+ $self->sleep(2);
+
+ $ss->{$sid} = 1;
+ $hardware->write_service_status($ss);
+
+ print "END WORKER $sid\n";
+
+ return 0;
+
+ } elsif ($cmd eq 'stop') {
+
+ if (!$ss->{$sid}) {
+ print "WORKER status $sid: stopped\n";
+ return 0;
+ }
+ print "STOP WORKER $sid\n";
+
+ $self->sleep(2);
+
+ $ss->{$sid} = 0;
+ $hardware->write_service_status($ss);
+
+ print "END WORKER $sid\n";
+
+ return 0;
+ }
+
+ die "implement me";
+}
+
1;