2 Copyright (C) 2010 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
21 #define G_LOG_DOMAIN "loop"
25 #endif /* HAVE_CONFIG_H */
32 #include <sys/types.h>
39 #include "cfs-utils.h"
44 const char *log_domain
;
45 cfs_service_callbacks_t
*callbacks
;
51 enum qb_loop_priority priority
;
58 gboolean server_started
;
59 gboolean stop_worker_flag
;
60 GCond server_started_cond
;
61 GCond server_stopped_cond
;
62 GMutex server_started_mutex
;
69 cfs_service_set_timer(
70 cfs_service_t
*service
,
73 g_return_val_if_fail(service
!= NULL
, FALSE
);
75 service
->period
= period
;
81 cfs_service_get_context(cfs_service_t
*service
)
83 g_return_val_if_fail(service
!= NULL
, NULL
);
85 return service
->context
;
89 cfs_service_set_restartable(
90 cfs_service_t
*service
,
93 g_return_if_fail(service
!= NULL
);
95 service
->restartable
= restartable
;
100 cfs_service_callbacks_t
*callbacks
,
101 const char *log_domain
,
104 g_return_val_if_fail(callbacks
!= NULL
, NULL
);
105 g_return_val_if_fail(callbacks
->cfs_service_initialize_fn
!= NULL
, NULL
);
106 g_return_val_if_fail(callbacks
->cfs_service_finalize_fn
!= NULL
, NULL
);
107 g_return_val_if_fail(callbacks
->cfs_service_dispatch_fn
!= NULL
, NULL
);
109 cfs_service_t
*service
= g_new0(cfs_service_t
, 1);
114 service
->log_domain
= log_domain
;
116 service
->log_domain
= G_LOG_DOMAIN
;
118 service
->callbacks
= callbacks
;
120 service
->restartable
= TRUE
;
122 service
->context
= context
;
128 cfs_loop_new(struct fuse
*fuse
)
130 cfs_loop_t
*loop
= g_new0(cfs_loop_t
, 1);
132 g_mutex_init(&loop
->server_started_mutex
);
133 g_cond_init(&loop
->server_started_cond
);
134 g_cond_init(&loop
->server_stopped_cond
);
136 if (!(loop
->qbloop
= qb_loop_create())) {
137 cfs_critical("cant create event loop");
148 cfs_loop_destroy(cfs_loop_t
*loop
)
150 g_return_if_fail(loop
!= NULL
);
153 qb_loop_destroy(loop
->qbloop
);
156 g_list_free(loop
->services
);
158 g_mutex_clear(&loop
->server_started_mutex
);
159 g_cond_clear(&loop
->server_started_cond
);
160 g_cond_clear(&loop
->server_stopped_cond
);
166 cfs_loop_add_service(
168 cfs_service_t
*service
,
169 enum qb_loop_priority priority
)
171 g_return_val_if_fail(loop
!= NULL
, FALSE
);
172 g_return_val_if_fail(service
!= NULL
, FALSE
);
173 g_return_val_if_fail(service
->log_domain
!= NULL
, FALSE
);
175 service
->priority
= priority
;
176 service
->qbloop
= loop
->qbloop
;
178 loop
->services
= g_list_append(loop
->services
, service
);
189 cfs_service_t
*service
= (cfs_service_t
*)data
;
191 if (!service
->callbacks
->cfs_service_dispatch_fn(service
, service
->context
)) {
192 qb_loop_poll_del(service
->qbloop
, fd
);
193 service
->initialized
= FALSE
;
194 service
->errcount
= 0;
196 if (!service
->restartable
)
197 service
->callbacks
->cfs_service_finalize_fn(service
, service
->context
);
206 service_timer_job(void *data
)
208 g_return_if_fail(data
!= NULL
);
210 cfs_loop_t
*loop
= (cfs_loop_t
*)data
;
211 qb_loop_t
*qbloop
= loop
->qbloop
;
213 gboolean terminate
= FALSE
;
215 g_mutex_lock (&loop
->server_started_mutex
);
217 if (loop
->stop_worker_flag
) {
218 cfs_debug ("got terminate request");
219 qb_loop_stop(qbloop
);
220 loop
->server_started
= 0;
221 g_cond_signal (&loop
->server_stopped_cond
);
223 } else if (!loop
->server_started
) {
224 loop
->server_started
= 1;
225 g_cond_signal (&loop
->server_started_cond
);
228 g_mutex_unlock (&loop
->server_started_mutex
);
233 GList
*l
= loop
->services
;
235 cfs_service_t
*service
= (cfs_service_t
*)l
->data
;
238 if (!service
->initialized
)
241 time_t ctime
= time(NULL
);
242 if (service
->period
&& service
->callbacks
->cfs_service_timer_fn
&&
243 ((ctime
- service
->last_timeout
) >= service
->period
)) {
244 service
->last_timeout
= ctime
;
245 service
->callbacks
->cfs_service_timer_fn(service
, service
->context
);
249 qb_loop_timer_handle th
;
250 qb_loop_timer_add(qbloop
, QB_LOOP_LOW
, 1000000000, data
, service_timer_job
, &th
);
254 service_start_job(void *data
)
256 g_return_if_fail(data
!= NULL
);
258 cfs_loop_t
*loop
= (cfs_loop_t
*)data
;
259 qb_loop_t
*qbloop
= loop
->qbloop
;
261 gboolean terminate
= FALSE
;
262 g_mutex_lock (&loop
->server_started_mutex
);
263 terminate
= loop
->stop_worker_flag
;
264 g_mutex_unlock (&loop
->server_started_mutex
);
269 GList
*l
= loop
->services
;
270 time_t ctime
= time(NULL
);
273 cfs_service_t
*service
= (cfs_service_t
*)l
->data
;
276 if (service
->restartable
&& !service
->initialized
&&
277 ((ctime
- service
->last_init
) > 5)) {
278 int fd
= service
->callbacks
->cfs_service_initialize_fn(service
, service
->context
);
279 service
->last_init
= ctime
;
282 service
->initialized
= TRUE
;
283 service
->errcount
= 0;
286 if ((res
= qb_loop_poll_add(qbloop
, service
->priority
, fd
, POLLIN
,
287 service
, poll_dispatch_fn
)) != 0) {
288 cfs_critical("qb_loop_poll_add failed: %s - disabling service",
290 service
->initialized
= FALSE
;
291 service
->restartable
= FALSE
;
292 service
->callbacks
->cfs_service_finalize_fn(service
, service
->context
);
295 if (!service
->errcount
)
296 cfs_dom_critical(service
->log_domain
, "can't initialize service");
302 qb_loop_timer_handle th
;
303 qb_loop_timer_add(qbloop
, QB_LOOP_LOW
, 1000000000, data
, service_start_job
, &th
);
307 cfs_loop_worker_thread(gpointer data
)
309 g_return_val_if_fail(data
!= NULL
, NULL
);
311 cfs_loop_t
*loop
= (cfs_loop_t
*)data
;
312 qb_loop_t
*qbloop
= loop
->qbloop
;
315 time_t ctime
= time(NULL
);
318 cfs_service_t
*service
= (cfs_service_t
*)l
->data
;
320 service
->last_timeout
= ctime
;
323 qb_loop_timer_handle th
;
324 qb_loop_timer_add(qbloop
, QB_LOOP_LOW
, 10000000, loop
, service_start_job
, &th
);
326 qb_loop_timer_add(qbloop
, QB_LOOP_LOW
, 1000000000, loop
, service_timer_job
, &th
);
328 cfs_debug("start loop");
332 cfs_debug("end loop");
336 cfs_service_t
*service
= (cfs_service_t
*)l
->data
;
338 service
->callbacks
->cfs_service_finalize_fn(service
, service
->context
);
345 cfs_loop_start_worker(cfs_loop_t
*loop
)
347 g_return_val_if_fail(loop
!= NULL
, FALSE
);
349 loop
->worker
= g_thread_new("cfs_loop", cfs_loop_worker_thread
, loop
);
351 g_mutex_lock (&loop
->server_started_mutex
);
352 while (!loop
->server_started
)
353 g_cond_wait (&loop
->server_started_cond
, &loop
->server_started_mutex
);
354 g_mutex_unlock (&loop
->server_started_mutex
);
356 cfs_debug("worker started");
362 cfs_loop_stop_worker(cfs_loop_t
*loop
)
364 g_return_if_fail(loop
!= NULL
);
366 cfs_debug("cfs_loop_stop_worker");
368 g_mutex_lock (&loop
->server_started_mutex
);
369 loop
->stop_worker_flag
= TRUE
;
370 while (loop
->server_started
)
371 g_cond_wait (&loop
->server_stopped_cond
, &loop
->server_started_mutex
);
372 g_mutex_unlock (&loop
->server_started_mutex
);
375 g_thread_join(loop
->worker
);