]> git.proxmox.com Git - pve-ha-manager.git/commitdiff
add basic LRM functionality
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 14 Feb 2015 09:38:27 +0000 (10:38 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 14 Feb 2015 09:38:27 +0000 (10:38 +0100)
PVE/HA/Env.pm
PVE/HA/Env/PVE2.pm
PVE/HA/LRM.pm
PVE/HA/Manager.pm
PVE/HA/Sim/Env.pm
PVE/HA/Sim/Hardware.pm
PVE/HA/Sim/RTEnv.pm

index 675e685baaf2663268d8b14ae6be8ae8b76cfded..d3642522e3cd093ffda08ba250f1a9382b0f281d 100644 (file)
@@ -39,6 +39,21 @@ sub write_manager_status {
     return $self->{plug}->write_manager_status($status_obj);
 }
 
+# lrm status is written by LRM, protected by ha_agent_lock,
+# but can be read by any node (CRM)
+
+sub read_lrm_status {
+    my ($self, $node) = @_;
+
+    return $self->{plug}->read_lrm_status($node);
+}
+
+sub write_lrm_status {
+    my ($self, $status_obj) = @_;
+
+    return $self->{plug}->write_lrm_status($status_obj);
+}
+
 # we use this to enable/disbale ha
 sub manager_status_exists {
     my ($self) = @_;
@@ -146,4 +161,10 @@ sub watchdog_close {
     return $self->{plug}->watchdog_close($wfh);
 }
 
+sub exec_resource_agent {
+    my ($self, $sid, $cmd, @params) = @_;
+
+    return $self->{plug}->exec_resource_agent($sid, $cmd, @params)
+}
+
 1;
index 7bcfaa6659d8a917aa36670446b3e2f531672b88..85546c4d38c844fa2c36d3b5cf614031839b014d 100644 (file)
@@ -48,6 +48,26 @@ sub write_manager_status {
     PVE::HA::Tools::write_json_to_file($filename, $status_obj); 
 }
 
+sub read_lrm_status {
+    my ($self, $node) = @_;
+
+    $node = $self->{nodename} if !defined($node);
+
+    my $filename = "/etc/pve/nodes/$node/lrm_status";
+
+    return PVE::HA::Tools::read_json_from_file($filename, {});  
+}
+
+sub write_lrm_status {
+    my ($self, $status_obj) = @_;
+
+    $node = $self->{nodename};
+
+    my $filename = "/etc/pve/nodes/$node/lrm_status";
+
+    PVE::HA::Tools::write_json_to_file($filename, $status_obj); 
+}
+
 sub manager_status_exists {
     my ($self) = @_;
     
@@ -182,4 +202,10 @@ sub watchdog_close {
     die "implement me";
 }
 
+sub exec_resource_agent {
+    my ($self, $sid, $cmd, @params) = @_;
+
+    die "implement me";
+}
+
 1;
index 78da10bfdec3925ca5cc8d9964e8efc05cec89d7..85ef65525ad6a0fd227142ded07fc4bd913258b9 100644 (file)
@@ -4,6 +4,8 @@ package PVE::HA::LRM;
 
 use strict;
 use warnings;
+use Data::Dumper;
+use POSIX qw(:sys_wait_h);
 
 use PVE::SafeSyslog;
 use PVE::Tools;
@@ -25,6 +27,8 @@ sub new {
     my $self = bless {
        haenv => $haenv,
        status => { state => 'startup' },
+       workers => {},
+       results => {},
     }, $class;
 
     $self->set_local_status({ state => 'wait_for_agent_lock' });   
@@ -136,6 +140,8 @@ sub do_one_iteration {
 
     # do work
 
+    $self->{service_status} = {};
+
     if ($state eq 'wait_for_agent_lock') {
 
        return 0 if $self->{shutdown_request};
@@ -169,6 +175,12 @@ sub do_one_iteration {
 
                    $shutdown = 1;
                }
+           } else {
+               my $ms = $haenv->read_manager_status();
+
+               $self->{service_status} =  $ms->{service_status} || {};
+
+               $self->manage_resources();
            }
        };
        if (my $err = $@) {
@@ -215,4 +227,154 @@ sub do_one_iteration {
     return 1;
 }
 
+sub manage_resources {
+    my ($self) = @_;
+
+    my $haenv = $self->{haenv};
+
+    my $nodename = $haenv->nodename();
+
+    my $ms = $haenv->read_manager_status();
+
+    my $ss = $self->{service_status};
+
+    foreach my $sid (keys %$ss) {
+       my $sd = $ss->{$sid};
+       next if !$sd->{node};
+       next if !$sd->{uid};
+       next if $sd->{node} ne $nodename;
+       my $req_state = $sd->{state};
+       next if !defined($req_state);
+
+       eval {
+           $self->queue_resource_command($sid, $sd->{uid}, $req_state);
+       };
+       if (my $err = $@) {
+           warn "unable to run resource agent for '$sid' - $err"; # fixme
+       }
+    }
+
+    my $starttime = time();
+
+    # start workers
+    my $max_workers = 4;
+
+    while ((time() - $starttime) < 5) {
+       my $count =  $self->check_active_workers();
+
+       foreach my $sid (keys %{$self->{workers}}) {
+           last if $count >= $max_workers;
+           my $w = $self->{workers}->{$sid};
+           if (!$w->{pid}) {
+               my $pid = fork();
+               if (!defined($pid)) {
+                   warn "fork worker failed\n";
+                   $count = 0; last; # abort, try later
+               } elsif ($pid == 0) {
+                   # do work
+                   my $res = -1;
+                   eval {
+                       $res = $haenv->exec_resource_agent($sid, $w->{state});
+                   };
+                   if (my $err = $@) {
+                       warn $err;
+                       POSIX::_exit(-1);
+                   }  
+                   POSIX::_exit($res); 
+               } else {
+                   $count++;
+                   $w->{pid} = $pid;
+               }
+           }
+       }
+
+       last if !$count;
+
+       sleep(1);
+    }
+}
+
+# fixme: use a queue an limit number of parallel workers?
+sub queue_resource_command {
+    my ($self, $sid, $uid, $state) = @_;
+
+    if (my $w = $self->{workers}->{$sid}) {
+       return if $w->{pid}; # already started
+       # else, delete and overwrite queue entry with new command
+       delete $self->{workers}->{$sid};
+    }
+
+    $self->{workers}->{$sid} = {
+       sid => $sid,
+       uid => $uid,
+       state => $state,
+    };
+}
+
+sub check_active_workers {
+    my ($self) = @_;
+
+    # finish/count workers
+    my $count = 0;
+    foreach my $sid (keys %{$self->{workers}}) {
+       my $w = $self->{workers}->{$sid};
+       if (my $pid = $w->{pid}) {
+           # check status
+           my $waitpid = waitpid($pid, WNOHANG);
+           if (defined($waitpid) && ($waitpid == $pid)) {
+               $self->resource_command_finished($sid, $w->{uid}, $?);
+           } else {
+               $count++;
+           }
+       }
+    }
+    
+    return $count;
+}
+
+sub resource_command_finished {
+    my ($self, $sid, $uid, $status) = @_;
+
+    my $haenv = $self->{haenv};
+
+    my $w = delete $self->{workers}->{$sid};
+    return if !$w; # should not happen
+
+    my $exit_code = -1;
+
+    if ($status == -1) {
+       print "RA $sid finished - failed to execute\n";    
+    }  elsif (my $sig = ($status & 127)) {
+       print "RA $sid finished - got signal $sig\n";
+    } else {
+       $exit_code = ($status >> 8);
+       print "RA $sid finished - exit code ${exit_code}\n";
+    }
+
+    $self->{results}->{$uid} = {
+       sid => $w->{sid},
+       state => $w->{state},
+       exit_code => $exit_code,
+    };
+
+    my $ss = $self->{service_status};
+
+    # compute hash of valid/existing uids
+    my $valid_uids = {};
+    foreach my $sid (keys %$ss) {
+       my $sd = $ss->{$sid};
+       next if !$sd->{uid};
+       $valid_uids->{$sd->{uid}} = 1;
+    }
+
+    my $results = {};
+    foreach my $id (keys %{$self->{results}}) {
+       next if !$valid_uids->{$id};
+       $results->{$id} = $self->{results}->{$id};
+    }
+    $self->{results} = $results;
+
+    $haenv->write_lrm_status($results);
+}
+
 1;
index 20980c89a99e2c9eb6d7390737766d3ce4a7c58f..c27bff1f2523fba7cf994960cbf6939bd4fd1c3e 100644 (file)
@@ -2,6 +2,7 @@ package PVE::HA::Manager;
 
 use strict;
 use warnings;
+use Digest::MD5 qw(md5_base64);
 
 use Data::Dumper;
 
@@ -63,6 +64,8 @@ sub select_service_node {
     return shift @$online_nodes;
 }
 
+my $uid_counter = 0;
+
 my $change_service_state = sub {
     my ($self, $sid, $new_state, %params) = @_;
 
@@ -84,6 +87,8 @@ my $change_service_state = sub {
     }
     
     $sd->{state} = $new_state;
+    $uid_counter++;
+    $sd->{uid} = md5_base64($new_state . $$ . time() . $uid_counter);
 
     # fixme: cleanup state (remove unused values)
 
@@ -91,18 +96,42 @@ my $change_service_state = sub {
     $haenv->log('info', "service '$sid': state changed to '$new_state' $changes\n");
 };
 
+# read LRM status for all nodes (even for offline nodes)
+sub read_lrm_status {
+    my ($self, $node_info) = @_;
+
+    my $haenv = $self->{haenv};
+
+    my $res = {};
+
+    foreach my $node (keys %$node_info) {
+       my $ls = $haenv->read_lrm_status($node);
+       foreach my $uid (keys %$ls) {
+           next if $res->{$uid}; # should not happen
+           $res->{$uid} = $ls->{$uid};
+       }
+    }
+
+    return $res;
+}
+
 sub manage {
     my ($self) = @_;
 
     my ($haenv, $ms, $ns, $ss) = ($self->{haenv}, $self->{ms}, $self->{ns}, $self->{ss});
 
-    $ns->update($haenv->get_node_info());
+    my ($node_info, $quorate) = $haenv->get_node_info();
+    $ns->update($node_info);
+
+    # fixme: what if $quorate is 0??
 
     if (!$ns->node_is_online($haenv->nodename())) {
        $haenv->log('info', "master seems offline\n");
        return;
     }
 
+    my $lrm_status = $self->read_lrm_status($node_info);
+
     my $sc = $haenv->read_service_config();
 
     # compute new service status
index afdcc06c7718c555de4fd44df8d6d65b73aacbbf..3c9467d2a9c8e493532b643a431ce23073aad8a9 100644 (file)
@@ -131,6 +131,22 @@ sub manager_status_exists {
     return -f $filename ? 1 : 0;
 }
 
+sub read_lrm_status {
+    my ($self, $node) = @_;
+
+    $node = $self->{nodename} if !defined($node);
+
+    return $self->{hardware}->read_lrm_status($node);
+}
+
+sub write_lrm_status {
+    my ($self, $status_obj) = @_;
+
+    my $node = $self->{nodename};
+
+    return $self->{hardware}->write_lrm_status($node, $status_obj);
+}
+
 sub read_service_config {
     my ($self) = @_;
 
@@ -243,4 +259,10 @@ sub watchdog_close {
     return $self->{hardware}->watchdog_close($wfh);
 }
 
+sub exec_resource_agent {
+    my ($self, $sid, $cmd, @params) = @_;
+
+    die "implement me";
+}
+
 1;
index e4394719337f157827c284515b05423d62c3d556..7dc4e592ded6cd917c231239a9d4450e50628755 100644 (file)
@@ -27,6 +27,7 @@ my $watchdog_timeout = 60;
 # $testdir/hardware_status   Hardware description (number of nodes, ...)
 # $testdir/manager_status    CRM status (start with {})
 # $testdir/service_config    Service configuration
+# $testdir/service_status    Service status
 
 #
 # runtime status for simulation system
@@ -37,9 +38,26 @@ my $watchdog_timeout = 60;
 #
 # runtime status
 #
-# $testdir/status/local_status_<node>  local CRM Daemon status
+# $testdir/status/lrm_status_<node>    LRM status
 # $testdir/status/manager_status       CRM status
 # $testdir/status/service_config       Service configuration
+# $testdir/status/service_status       Service status
+
+sub read_lrm_status {
+    my ($self, $node) = @_;
+
+    my $filename = "$self->{statusdir}/lrm_status_$node";
+
+    return PVE::HA::Tools::read_json_from_file($filename, {});  
+}
+
+sub write_lrm_status {
+    my ($self, $node, $status_obj) = @_;
+
+    my $filename = "$self->{statusdir}/lrm_status_$node";
+
+    PVE::HA::Tools::write_json_to_file($filename, $status_obj); 
+}
 
 sub read_hardware_status_nolock {
     my ($self) = @_;
@@ -90,6 +108,20 @@ sub write_service_config {
     return PVE::HA::Tools::write_json_to_file($filename, $conf);
 } 
 
+sub read_service_status {
+    my ($self) = @_;
+
+    my $filename = "$self->{statusdir}/service_status";
+    return PVE::HA::Tools::read_json_from_file($filename); 
+}
+
+sub write_service_status {
+    my ($self, $data) = @_;
+
+    my $filename = "$self->{statusdir}/service_status";
+    return PVE::HA::Tools::write_json_to_file($filename, $data);
+} 
+
 sub new {
     my ($this, $testdir) = @_;
 
@@ -121,6 +153,12 @@ sub new {
        $self->write_service_config($conf);
     }
 
+    if (-f "$testdir/service_status") {
+       copy("$testdir/service_status", "$statusdir/service_status");
+    } else {   
+       $self->write_service_status({});
+    }
+
     if (-f "$testdir/hardware_status") {
        copy("$testdir/hardware_status", "$statusdir/hardware_status") ||
            die "Copy failed: $!\n";
index af2b46d36b0f9908937f7446a314d4f00a18bc1b..6fa8e10a6cb09714614c1823aa2bce835fef584b 100644 (file)
@@ -71,4 +71,59 @@ sub loop_end_hook {
     die "loop take too long ($delay seconds)\n" if $delay > 30;
 }
 
+sub exec_resource_agent {
+    my ($self, $sid, $cmd, @params) = @_;
+
+    my $hardware = $self->{hardware};
+
+    my $ss = $hardware->read_service_status();
+
+    if ($cmd eq 'request_stop') {
+
+       if (!$ss->{$sid}) {
+           print "WORKER status $sid: stopped\n";
+           return 0;
+       } else {
+           print "WORKER status $sid: running\n";
+           return 1;
+       }
+
+    } elsif ($cmd eq 'start') {
+
+       if ($ss->{$sid}) {
+           print "WORKER status $sid: running\n";
+           return 0;
+       }
+       print "START WORKER $sid\n";
+       
+       $self->sleep(2);
+
+       $ss->{$sid} = 1;
+       $hardware->write_service_status($ss);
+
+       print "END WORKER $sid\n";
+
+       return 0;
+
+    } elsif ($cmd eq 'stop') {
+
+       if (!$ss->{$sid}) {
+           print "WORKER status $sid: stopped\n";
+           return 0;
+       }
+       print "STOP WORKER $sid\n";
+       
+       $self->sleep(2);
+
+       $ss->{$sid} = 0;
+       $hardware->write_service_status($ss);
+
+       print "END WORKER $sid\n";
+
+       return 0;
+    } 
+
+    die "implement me";
+}
+
 1;