]> git.proxmox.com Git - pve-cluster.git/blob - data/src/loop.c
pmxcfs: update copyright in license header
[pve-cluster.git] / data / src / loop.c
1 /*
2 Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "loop"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdlib.h>
28 #include <unistd.h>
29 #include <errno.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sys/types.h>
33 #include <utime.h>
34 #include <sys/stat.h>
35 #include <glib.h>
36 #include <syslog.h>
37 #include <poll.h>
38
39 #include "cfs-utils.h"
40 #include "loop.h"
41
42 struct cfs_service {
43 qb_loop_t *qbloop;
44 const char *log_domain;
45 cfs_service_callbacks_t *callbacks;
46 gboolean restartable;
47 gpointer context;
48 gboolean initialized;
49 int errcount;
50 time_t last_init;
51 enum qb_loop_priority priority;
52 time_t period;
53 time_t last_timeout;
54 };
55
56 struct cfs_loop {
57 GThread *worker;
58 gboolean server_started;
59 gboolean stop_worker_flag;
60 GCond server_started_cond;
61 GCond server_stopped_cond;
62 GMutex server_started_mutex;
63 qb_loop_t *qbloop;
64 struct fuse *fuse;
65 GList *services;
66 };
67
68 gboolean
69 cfs_service_set_timer(
70 cfs_service_t *service,
71 time_t period)
72 {
73 g_return_val_if_fail(service != NULL, FALSE);
74
75 service->period = period;
76
77 return TRUE;
78 }
79
80 gpointer
81 cfs_service_get_context(cfs_service_t *service)
82 {
83 g_return_val_if_fail(service != NULL, NULL);
84
85 return service->context;
86 }
87
88 void
89 cfs_service_set_restartable(
90 cfs_service_t *service,
91 gboolean restartable)
92 {
93 g_return_if_fail(service != NULL);
94
95 service->restartable = restartable;
96 }
97
98 cfs_service_t *
99 cfs_service_new(
100 cfs_service_callbacks_t *callbacks,
101 const char *log_domain,
102 gpointer context)
103 {
104 g_return_val_if_fail(callbacks != NULL, NULL);
105 g_return_val_if_fail(callbacks->cfs_service_initialize_fn != NULL, NULL);
106 g_return_val_if_fail(callbacks->cfs_service_finalize_fn != NULL, NULL);
107 g_return_val_if_fail(callbacks->cfs_service_dispatch_fn != NULL, NULL);
108
109 cfs_service_t *service = g_new0(cfs_service_t, 1);
110 if(!service)
111 return NULL;
112
113 if (log_domain)
114 service->log_domain = log_domain;
115 else
116 service->log_domain = G_LOG_DOMAIN;
117
118 service->callbacks = callbacks;
119
120 service->restartable = TRUE;
121
122 service->context = context;
123
124 return service;
125 }
126
127 cfs_loop_t *
128 cfs_loop_new(struct fuse *fuse)
129 {
130 cfs_loop_t *loop = g_new0(cfs_loop_t, 1);
131
132 g_mutex_init(&loop->server_started_mutex);
133 g_cond_init(&loop->server_started_cond);
134 g_cond_init(&loop->server_stopped_cond);
135
136 if (!(loop->qbloop = qb_loop_create())) {
137 cfs_critical("cant create event loop");
138 g_free(loop);
139 return NULL;
140 }
141
142 loop->fuse = fuse;
143
144 return loop;
145 }
146
147 void
148 cfs_loop_destroy(cfs_loop_t *loop)
149 {
150 g_return_if_fail(loop != NULL);
151
152 if (loop->qbloop)
153 qb_loop_destroy(loop->qbloop);
154
155 if(loop->services)
156 g_list_free(loop->services);
157
158 g_mutex_clear(&loop->server_started_mutex);
159 g_cond_clear(&loop->server_started_cond);
160 g_cond_clear(&loop->server_stopped_cond);
161
162 g_free(loop);
163 }
164
165 gboolean
166 cfs_loop_add_service(
167 cfs_loop_t *loop,
168 cfs_service_t *service,
169 enum qb_loop_priority priority)
170 {
171 g_return_val_if_fail(loop != NULL, FALSE);
172 g_return_val_if_fail(service != NULL, FALSE);
173 g_return_val_if_fail(service->log_domain != NULL, FALSE);
174
175 service->priority = priority;
176 service->qbloop = loop->qbloop;
177
178 loop->services = g_list_append(loop->services, service);
179
180 return TRUE;
181 }
182
183 static int32_t
184 poll_dispatch_fn(
185 int32_t fd,
186 int32_t revents,
187 void *data)
188 {
189 cfs_service_t *service = (cfs_service_t *)data;
190
191 if (!service->callbacks->cfs_service_dispatch_fn(service, service->context)) {
192 qb_loop_poll_del(service->qbloop, fd);
193 service->initialized = FALSE;
194 service->errcount = 0;
195
196 if (!service->restartable)
197 service->callbacks->cfs_service_finalize_fn(service, service->context);
198
199 return -1;
200 }
201
202 return 0;
203 }
204
205 static void
206 service_timer_job(void *data)
207 {
208 g_return_if_fail(data != NULL);
209
210 cfs_loop_t *loop = (cfs_loop_t *)data;
211 qb_loop_t *qbloop = loop->qbloop;
212
213 gboolean terminate = FALSE;
214
215 g_mutex_lock (&loop->server_started_mutex);
216
217 if (loop->stop_worker_flag) {
218 cfs_debug ("got terminate request");
219 qb_loop_stop(qbloop);
220 loop->server_started = 0;
221 g_cond_signal (&loop->server_stopped_cond);
222 terminate = TRUE;
223 } else if (!loop->server_started) {
224 loop->server_started = 1;
225 g_cond_signal (&loop->server_started_cond);
226 }
227
228 g_mutex_unlock (&loop->server_started_mutex);
229
230 if (terminate)
231 return;
232
233 GList *l = loop->services;
234 while (l) {
235 cfs_service_t *service = (cfs_service_t *)l->data;
236 l = g_list_next(l);
237
238 if (!service->initialized)
239 continue;
240
241 time_t ctime = time(NULL);
242 if (service->period && service->callbacks->cfs_service_timer_fn &&
243 ((ctime - service->last_timeout) >= service->period)) {
244 service->last_timeout = ctime;
245 service->callbacks->cfs_service_timer_fn(service, service->context);
246 }
247 }
248
249 qb_loop_timer_handle th;
250 qb_loop_timer_add(qbloop, QB_LOOP_LOW, 1000000000, data, service_timer_job, &th);
251 }
252
253 static void
254 service_start_job(void *data)
255 {
256 g_return_if_fail(data != NULL);
257
258 cfs_loop_t *loop = (cfs_loop_t *)data;
259 qb_loop_t *qbloop = loop->qbloop;
260
261 gboolean terminate = FALSE;
262 g_mutex_lock (&loop->server_started_mutex);
263 terminate = loop->stop_worker_flag;
264 g_mutex_unlock (&loop->server_started_mutex);
265
266 if (terminate)
267 return;
268
269 GList *l = loop->services;
270 time_t ctime = time(NULL);
271
272 while (l) {
273 cfs_service_t *service = (cfs_service_t *)l->data;
274 l = g_list_next(l);
275
276 if (service->restartable && !service->initialized &&
277 ((ctime - service->last_init) > 5)) {
278 int fd = service->callbacks->cfs_service_initialize_fn(service, service->context);
279 service->last_init = ctime;
280
281 if (fd >= 0) {
282 service->initialized = TRUE;
283 service->errcount = 0;
284
285 int res;
286 if ((res = qb_loop_poll_add(qbloop, service->priority, fd, POLLIN,
287 service, poll_dispatch_fn)) != 0) {
288 cfs_critical("qb_loop_poll_add failed: %s - disabling service",
289 g_strerror(-res));
290 service->initialized = FALSE;
291 service->restartable = FALSE;
292 service->callbacks->cfs_service_finalize_fn(service, service->context);
293 }
294 } else {
295 if (!service->errcount)
296 cfs_dom_critical(service->log_domain, "can't initialize service");
297 service->errcount++;
298 }
299 }
300 }
301
302 qb_loop_timer_handle th;
303 qb_loop_timer_add(qbloop, QB_LOOP_LOW, 1000000000, data, service_start_job, &th);
304 }
305
306 static gpointer
307 cfs_loop_worker_thread(gpointer data)
308 {
309 g_return_val_if_fail(data != NULL, NULL);
310
311 cfs_loop_t *loop = (cfs_loop_t *)data;
312 qb_loop_t *qbloop = loop->qbloop;
313
314 GList *l;
315 time_t ctime = time(NULL);
316 l = loop->services;
317 while (l) {
318 cfs_service_t *service = (cfs_service_t *)l->data;
319 l = g_list_next(l);
320 service->last_timeout = ctime;
321 }
322
323 qb_loop_timer_handle th;
324 qb_loop_timer_add(qbloop, QB_LOOP_LOW, 10000000, loop, service_start_job, &th);
325
326 qb_loop_timer_add(qbloop, QB_LOOP_LOW, 1000000000, loop, service_timer_job, &th);
327
328 cfs_debug("start loop");
329
330 qb_loop_run(qbloop);
331
332 cfs_debug("end loop");
333
334 l = loop->services;
335 while (l) {
336 cfs_service_t *service = (cfs_service_t *)l->data;
337 l = g_list_next(l);
338 service->callbacks->cfs_service_finalize_fn(service, service->context);
339 }
340
341 return NULL;
342 }
343
344 gboolean
345 cfs_loop_start_worker(cfs_loop_t *loop)
346 {
347 g_return_val_if_fail(loop != NULL, FALSE);
348
349 loop->worker = g_thread_new("cfs_loop", cfs_loop_worker_thread, loop);
350
351 g_mutex_lock (&loop->server_started_mutex);
352 while (!loop->server_started)
353 g_cond_wait (&loop->server_started_cond, &loop->server_started_mutex);
354 g_mutex_unlock (&loop->server_started_mutex);
355
356 cfs_debug("worker started");
357
358 return TRUE;
359 }
360
361 void
362 cfs_loop_stop_worker(cfs_loop_t *loop)
363 {
364 g_return_if_fail(loop != NULL);
365
366 cfs_debug("cfs_loop_stop_worker");
367
368 g_mutex_lock (&loop->server_started_mutex);
369 loop->stop_worker_flag = TRUE;
370 while (loop->server_started)
371 g_cond_wait (&loop->server_stopped_cond, &loop->server_started_mutex);
372 g_mutex_unlock (&loop->server_started_mutex);
373
374 if (loop->worker) {
375 g_thread_join(loop->worker);
376 loop->worker = NULL;
377 }
378 }