]> git.proxmox.com Git - pve-ha-manager.git/blobdiff - PVE/HA/LRM.pm
add basic LRM functionality
[pve-ha-manager.git] / PVE / HA / LRM.pm
index 78da10bfdec3925ca5cc8d9964e8efc05cec89d7..85ef65525ad6a0fd227142ded07fc4bd913258b9 100644 (file)
@@ -4,6 +4,8 @@ package PVE::HA::LRM;
 
 use strict;
 use warnings;
+use Data::Dumper;
+use POSIX qw(:sys_wait_h);
 
 use PVE::SafeSyslog;
 use PVE::Tools;
@@ -25,6 +27,8 @@ sub new {
     my $self = bless {
        haenv => $haenv,
        status => { state => 'startup' },
+       workers => {},
+       results => {},
     }, $class;
 
     $self->set_local_status({ state => 'wait_for_agent_lock' });   
@@ -136,6 +140,8 @@ sub do_one_iteration {
 
     # do work
 
+    $self->{service_status} = {};
+
     if ($state eq 'wait_for_agent_lock') {
 
        return 0 if $self->{shutdown_request};
@@ -169,6 +175,12 @@ sub do_one_iteration {
 
                    $shutdown = 1;
                }
+           } else {
+               my $ms = $haenv->read_manager_status();
+
+               $self->{service_status} =  $ms->{service_status} || {};
+
+               $self->manage_resources();
            }
        };
        if (my $err = $@) {
@@ -215,4 +227,154 @@ sub do_one_iteration {
     return 1;
 }
 
+sub manage_resources {
+    my ($self) = @_;
+
+    my $haenv = $self->{haenv};
+
+    my $nodename = $haenv->nodename();
+
+    my $ms = $haenv->read_manager_status();
+
+    my $ss = $self->{service_status};
+
+    foreach my $sid (keys %$ss) {
+       my $sd = $ss->{$sid};
+       next if !$sd->{node};
+       next if !$sd->{uid};
+       next if $sd->{node} ne $nodename;
+       my $req_state = $sd->{state};
+       next if !defined($req_state);
+
+       eval {
+           $self->queue_resource_command($sid, $sd->{uid}, $req_state);
+       };
+       if (my $err = $@) {
+           warn "unable to run resource agent for '$sid' - $err"; # fixme
+       }
+    }
+
+    my $starttime = time();
+
+    # start workers
+    my $max_workers = 4;
+
+    while ((time() - $starttime) < 5) {
+       my $count =  $self->check_active_workers();
+
+       foreach my $sid (keys %{$self->{workers}}) {
+           last if $count >= $max_workers;
+           my $w = $self->{workers}->{$sid};
+           if (!$w->{pid}) {
+               my $pid = fork();
+               if (!defined($pid)) {
+                   warn "fork worker failed\n";
+                   $count = 0; last; # abort, try later
+               } elsif ($pid == 0) {
+                   # do work
+                   my $res = -1;
+                   eval {
+                       $res = $haenv->exec_resource_agent($sid, $w->{state});
+                   };
+                   if (my $err = $@) {
+                       warn $err;
+                       POSIX::_exit(-1);
+                   }  
+                   POSIX::_exit($res); 
+               } else {
+                   $count++;
+                   $w->{pid} = $pid;
+               }
+           }
+       }
+
+       last if !$count;
+
+       sleep(1);
+    }
+}
+
+# fixme: use a queue an limit number of parallel workers?
+sub queue_resource_command {
+    my ($self, $sid, $uid, $state) = @_;
+
+    if (my $w = $self->{workers}->{$sid}) {
+       return if $w->{pid}; # already started
+       # else, delete and overwrite queue entry with new command
+       delete $self->{workers}->{$sid};
+    }
+
+    $self->{workers}->{$sid} = {
+       sid => $sid,
+       uid => $uid,
+       state => $state,
+    };
+}
+
+sub check_active_workers {
+    my ($self) = @_;
+
+    # finish/count workers
+    my $count = 0;
+    foreach my $sid (keys %{$self->{workers}}) {
+       my $w = $self->{workers}->{$sid};
+       if (my $pid = $w->{pid}) {
+           # check status
+           my $waitpid = waitpid($pid, WNOHANG);
+           if (defined($waitpid) && ($waitpid == $pid)) {
+               $self->resource_command_finished($sid, $w->{uid}, $?);
+           } else {
+               $count++;
+           }
+       }
+    }
+    
+    return $count;
+}
+
+sub resource_command_finished {
+    my ($self, $sid, $uid, $status) = @_;
+
+    my $haenv = $self->{haenv};
+
+    my $w = delete $self->{workers}->{$sid};
+    return if !$w; # should not happen
+
+    my $exit_code = -1;
+
+    if ($status == -1) {
+       print "RA $sid finished - failed to execute\n";    
+    }  elsif (my $sig = ($status & 127)) {
+       print "RA $sid finished - got signal $sig\n";
+    } else {
+       $exit_code = ($status >> 8);
+       print "RA $sid finished - exit code ${exit_code}\n";
+    }
+
+    $self->{results}->{$uid} = {
+       sid => $w->{sid},
+       state => $w->{state},
+       exit_code => $exit_code,
+    };
+
+    my $ss = $self->{service_status};
+
+    # compute hash of valid/existing uids
+    my $valid_uids = {};
+    foreach my $sid (keys %$ss) {
+       my $sd = $ss->{$sid};
+       next if !$sd->{uid};
+       $valid_uids->{$sd->{uid}} = 1;
+    }
+
+    my $results = {};
+    foreach my $id (keys %{$self->{results}}) {
+       next if !$valid_uids->{$id};
+       $results->{$id} = $self->{results}->{$id};
+    }
+    $self->{results} = $results;
+
+    $haenv->write_lrm_status($results);
+}
+
 1;