]> git.proxmox.com Git - pve-guest-common.git/blob - PVE/ReplicationState.pm
PVE/ReplicationState.pm: implement write_vmid_job_states
[pve-guest-common.git] / PVE / ReplicationState.pm
1 package PVE::ReplicationState;
2
3 use warnings;
4 use strict;
5 use JSON;
6
7 use PVE::INotify;
8 use PVE::Tools;
9 use PVE::CalendarEvent;
10 use PVE::Cluster;
11 use PVE::GuestHelpers;
12 use PVE::ReplicationConfig;
13
14 # Note: regression tests can overwrite $state_path for testing
15 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
16 our $state_lock = "/var/lib/pve-manager/pve-replication-state.lck";
17 our $replicate_logdir = "/var/log/pve/replicate";
18
19 # regression tests should overwrite this
20 sub job_logfile_name {
21 my ($jobid) = @_;
22
23 return "${replicate_logdir}/$jobid";
24 }
25
26 # Note: We use PVE::Tools::file_set_contents to write state file atomically,
27 # so read_state() always returns an consistent copy (even when not locked).
28
29 sub read_state {
30
31 return {} if ! -e $state_path;
32
33 my $raw = PVE::Tools::file_get_contents($state_path);
34
35 return {} if $raw eq '';
36
37 # untaint $raw
38 if ($raw =~ m/^({.*})$/) {
39 return decode_json($1);
40 }
41
42 die "invalid json data in '$state_path'\n";
43 }
44
45 sub extract_job_state {
46 my ($stateobj, $jobcfg) = @_;
47
48 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
49
50 my $vmid = $jobcfg->{guest};
51 my $tid = $plugin->get_unique_target_id($jobcfg);
52 my $state = $stateobj->{$vmid}->{$tid};
53
54 $state = {} if !$state;
55
56 $state->{last_iteration} //= 0;
57 $state->{last_try} //= 0; # last sync start time
58 $state->{last_sync} //= 0; # last successful sync start time
59 $state->{fail_count} //= 0;
60
61 return $state;
62 }
63
64 sub read_job_state {
65 my ($jobcfg) = @_;
66
67 my $stateobj = read_state();
68 return extract_job_state($stateobj, $jobcfg);
69 }
70
71 # update state for a single job
72 sub write_job_state {
73 my ($jobcfg, $state) = @_;
74
75 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
76
77 my $vmid = $jobcfg->{guest};
78 my $tid = $plugin->get_unique_target_id($jobcfg);
79
80 my $update = sub {
81
82 my $stateobj = read_state();
83 # Note: tuple ($vmid, $tid) is unique
84 $stateobj->{$vmid}->{$tid} = $state;
85
86 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
87 };
88
89 my $code = sub {
90 PVE::Tools::lock_file($state_lock, 10, $update);
91 die $@ if $@;
92 };
93
94 # make sure we have guest_migration_lock during update
95 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
96 }
97
98 # update all job states related to a specific $vmid
99 sub write_vmid_job_states {
100 my ($vmid_state, $vmid) = @_;
101
102 my $update = sub {
103 my $stateobj = read_state();
104 $stateobj->{$vmid} = $vmid_state;
105 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
106 };
107
108 my $code = sub {
109 PVE::Tools::lock_file($state_lock, 10, $update);
110 die $@ if $@;
111 };
112
113 # make sure we have guest_migration_lock during update
114 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
115 }
116
117 sub replication_snapshot_name {
118 my ($jobid, $last_sync) = @_;
119
120 my $prefix = "__replicate_${jobid}_";
121 my $snapname = "${prefix}${last_sync}__";
122
123 wantarray ? ($prefix, $snapname) : $snapname;
124 }
125
126 sub job_status {
127
128 my $local_node = PVE::INotify::nodename();
129
130 my $jobs = {};
131
132 my $stateobj = read_state();
133
134 my $cfg = PVE::ReplicationConfig->new();
135
136 my $vms = PVE::Cluster::get_vmlist();
137
138 foreach my $jobid (sort keys %{$cfg->{ids}}) {
139 my $jobcfg = $cfg->{ids}->{$jobid};
140 my $vmid = $jobcfg->{guest};
141
142 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
143
144 # skip non existing vms
145 next if !$vms->{ids}->{$vmid};
146
147 # only consider guest on local node
148 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
149
150 if (!$jobcfg->{remove_job}) {
151 # never sync to local node
152 next if $jobcfg->{target} eq $local_node;
153
154 next if $jobcfg->{disable};
155 }
156
157 my $state = extract_job_state($stateobj, $jobcfg);
158 $jobcfg->{state} = $state;
159 $jobcfg->{id} = $jobid;
160 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
161
162 my $next_sync = 0;
163
164 if ($jobcfg->{remove_job}) {
165 $next_sync = 1; # lowest possible value
166 # todo: consider fail_count? How many retries?
167 } else {
168 if (my $fail_count = $state->{fail_count}) {
169 if ($fail_count < 3) {
170 $next_sync = $state->{last_try} + 5*60*$fail_count;
171 }
172 } else {
173 my $schedule = $jobcfg->{schedule} || '*/15';
174 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
175 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
176 }
177 }
178
179 $jobcfg->{next_sync} = $next_sync;
180
181 $jobs->{$jobid} = $jobcfg;
182 }
183
184 return $jobs;
185 }
186
187 sub get_next_job {
188 my ($iteration, $start_time) = @_;
189
190 my $jobs = job_status();
191
192 my $sort_func = sub {
193 my $joba = $jobs->{$a};
194 my $jobb = $jobs->{$b};
195 my $sa = $joba->{state};
196 my $sb = $jobb->{state};
197 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
198 return $res if $res != 0;
199 $res = $joba->{next_sync} <=> $jobb->{next_sync};
200 return $res if $res != 0;
201 return $joba->{guest} <=> $jobb->{guest};
202 };
203
204 foreach my $jobid (sort $sort_func keys %$jobs) {
205 my $jobcfg = $jobs->{$jobid};
206 next if $jobcfg->{state}->{last_iteration} >= $iteration;
207 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
208 return $jobcfg;
209 }
210 }
211
212 return undef;
213 }
214
215 1;