2 Copyright (C) 2010 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
21 #define G_LOG_DOMAIN "ipcs"
25 #endif /* HAVE_CONFIG_H */
30 #include <sys/syslog.h>
33 #include <qb/qbdefs.h>
34 #include <qb/qbutil.h>
35 #include <qb/qbloop.h>
36 #include <qb/qbipcs.h>
40 #include "cfs-utils.h"
45 static GThread
*worker
;
46 static qb_loop_t
*loop
;
47 static qb_ipcs_service_t
* s1
;
48 static GString
*outbuf
;
49 static memdb_t
*memdb
;
51 static int server_started
= 0; /* protect with server_started_mutex */
52 static int terminate_server
= 0; /* protect with server_started_mutex */
53 static GCond server_started_cond
;
54 static GCond server_stopped_cond
;
55 static GMutex server_started_mutex
;
59 struct qb_ipc_request_header req_header
;
61 } cfs_status_update_request_header_t
;
64 struct qb_ipc_request_header req_header
;
67 } cfs_status_get_request_header_t
;
70 struct qb_ipc_request_header req_header
;
75 } cfs_log_msg_request_header_t
;
78 struct qb_ipc_request_header req_header
;
83 } cfs_log_get_request_header_t
;
93 static int32_t s1_connection_accept_fn(
94 qb_ipcs_connection_t
*c
,
98 if ((uid
== 0 && gid
== 0) || (gid
== cfs
.gid
)) {
99 cfs_debug("authenticated connection %d/%d", uid
, gid
);
100 struct s1_context
*ctx
= g_new0(struct s1_context
, 1);
103 ctx
->read_only
= (gid
== cfs
.gid
);
105 struct qb_ipcs_connection_stats stats
;
106 qb_ipcs_connection_stats_get(c
, &stats
, QB_FALSE
);
107 ctx
->client_pid
= stats
.client_pid
;
109 qb_ipcs_context_set(c
, ctx
);
112 cfs_critical("connection from bad user %d! - rejected", uid
);
116 static void s1_connection_created_fn(
117 qb_ipcs_connection_t
*c
)
119 struct qb_ipcs_stats srv_stats
;
121 qb_ipcs_stats_get(s1
, &srv_stats
, QB_FALSE
);
123 cfs_debug("Connection created > active:%d > closed:%d",
124 srv_stats
.active_connections
,
125 srv_stats
.closed_connections
);
128 static void s1_connection_destroyed_fn(
129 qb_ipcs_connection_t
*c
)
131 cfs_debug("connection about to be freed");
134 if ((ctx
= qb_ipcs_context_get(c
)))
139 static int32_t s1_connection_closed_fn(
140 qb_ipcs_connection_t
*c
)
142 struct qb_ipcs_connection_stats stats
;
144 qb_ipcs_connection_stats_get(c
, &stats
, QB_FALSE
);
146 cfs_debug("Connection to pid:%d destroyed", stats
.client_pid
);
151 static int32_t s1_msg_process_fn(
152 qb_ipcs_connection_t
*c
,
156 struct qb_ipc_request_header
*req_pt
=
157 (struct qb_ipc_request_header
*)data
;
159 struct s1_context
*ctx
= (struct s1_context
*)qb_ipcs_context_get(c
);
162 cfs_critical("qb_ipcs_context_get failed");
163 qb_ipcs_disconnect(c
);
167 cfs_debug("process msg:%d, size:%d", req_pt
->id
, req_pt
->size
);
171 g_string_truncate(outbuf
, 0);
173 int32_t result
= -ECHRNG
;
174 if (req_pt
->id
== 1) {
176 if (req_pt
->size
!= sizeof(struct qb_ipc_request_header
)) {
179 result
= cfs_create_version_msg(outbuf
);
182 } else if (req_pt
->id
== 2) {
184 if (req_pt
->size
!= sizeof(struct qb_ipc_request_header
)) {
187 result
= cfs_create_memberlist_msg(outbuf
);
190 } else if (req_pt
->id
== 3) {
192 if (req_pt
->size
!= sizeof(struct qb_ipc_request_header
)) {
195 result
= cfs_create_vmlist_msg(outbuf
);
197 } else if (req_pt
->id
== 4) {
199 cfs_status_update_request_header_t
*rh
=
200 (cfs_status_update_request_header_t
*)data
;
202 int datasize
= req_pt
->size
- sizeof(cfs_status_update_request_header_t
);
204 if (ctx
->read_only
) {
206 } else if (datasize
< 0) {
209 /* make sure name is 0 terminated */
210 rh
->name
[sizeof(rh
->name
) - 1] = 0;
212 char *dataptr
= data
+ sizeof(cfs_status_update_request_header_t
);
214 result
= cfs_status_set(rh
->name
, dataptr
, datasize
);
216 } else if (req_pt
->id
== 5) {
218 cfs_status_get_request_header_t
*rh
=
219 (cfs_status_get_request_header_t
*)data
;
221 int datasize
= req_pt
->size
- sizeof(cfs_status_get_request_header_t
);
226 /* make sure all names are 0 terminated */
227 rh
->name
[sizeof(rh
->name
) - 1] = 0;
228 rh
->nodename
[sizeof(rh
->nodename
) - 1] = 0;
230 result
= cfs_create_status_msg(outbuf
, rh
->nodename
, rh
->name
);
232 } else if (req_pt
->id
== 6) {
234 int pathlen
= req_pt
->size
- sizeof(struct qb_ipc_request_header
);
239 /* make sure path is 0 terminated */
240 ((char *)data
)[req_pt
->size
] = 0;
241 char *path
= data
+ sizeof(struct qb_ipc_request_header
);
243 if (ctx
->read_only
&& path_is_private(path
)) {
247 result
= memdb_read(memdb
, path
, &tmp
);
249 g_string_append_len(outbuf
, tmp
, result
);
254 } else if (req_pt
->id
== 7) {
256 cfs_log_msg_request_header_t
*rh
=
257 (cfs_log_msg_request_header_t
*)data
;
259 int datasize
= req_pt
->size
- G_STRUCT_OFFSET(cfs_log_msg_request_header_t
, data
);
260 int msg_len
= datasize
- rh
->ident_len
- rh
->tag_len
;
262 if (ctx
->read_only
) {
264 } else if (msg_len
< 1) {
267 char *msg
= rh
->data
;
268 if ((msg
[rh
->ident_len
- 1] == 0) &&
269 (msg
[rh
->ident_len
+ rh
->tag_len
- 1] == 0) &&
270 (((char *)data
)[req_pt
->size
] == 0)) {
273 char *tag
= msg
+ rh
->ident_len
;
274 msg
= msg
+ rh
->ident_len
+ rh
->tag_len
;
276 time_t ctime
= time(NULL
);
277 clog_entry_t
*entry
= (clog_entry_t
*)alloca(CLOG_MAX_ENTRY_SIZE
);
278 if (clog_pack(entry
, cfs
.nodename
, ident
, tag
, ctx
->client_pid
,
279 ctime
, rh
->priority
, msg
)) {
280 cfs_cluster_log(entry
);
289 } else if (req_pt
->id
== 8) {
291 cfs_log_get_request_header_t
*rh
=
292 (cfs_log_get_request_header_t
*)data
;
294 int userlen
= req_pt
->size
- sizeof(cfs_log_get_request_header_t
);
299 /* make sure user string is 0 terminated */
300 ((char *)data
)[req_pt
->size
] = 0;
301 char *user
= data
+ sizeof(cfs_log_get_request_header_t
);
303 uint32_t max
= rh
->max_entries
? rh
->max_entries
: 50;
304 cfs_cluster_log_dump(outbuf
, user
, max
);
307 } else if (req_pt
->id
== 10) {
309 if (req_pt
->size
!= sizeof(struct qb_ipc_request_header
)) {
312 cfs_rrd_dump(outbuf
);
317 cfs_debug("process result %d", result
);
325 struct iovec iov
[iov_len
];
326 struct qb_ipc_response_header res_header
;
328 int resp_data_len
= resp
? outbuf
->len
: 0;
330 res_header
.id
= req_pt
->id
;
331 res_header
.size
= sizeof(res_header
) + resp_data_len
;
332 res_header
.error
= result
;
334 iov
[0].iov_base
= (char *)&res_header
;
335 iov
[0].iov_len
= sizeof(res_header
);
336 iov
[1].iov_base
= resp
;
337 iov
[1].iov_len
= resp_data_len
;
339 ssize_t res
= qb_ipcs_response_sendv(c
, iov
, iov_len
);
341 cfs_critical("qb_ipcs_response_send: %s", strerror(errno
));
342 qb_ipcs_disconnect(c
);
348 static int32_t my_job_add(
349 enum qb_loop_priority p
,
351 qb_loop_job_dispatch_fn fn
)
353 return qb_loop_job_add(loop
, p
, data
, fn
);
356 static int32_t my_dispatch_add(
357 enum qb_loop_priority p
,
361 qb_ipcs_dispatch_fn_t fn
)
363 return qb_loop_poll_add(loop
, p
, fd
, evts
, data
, fn
);
366 static int32_t my_dispatch_mod(
367 enum qb_loop_priority p
,
371 qb_ipcs_dispatch_fn_t fn
)
373 return qb_loop_poll_mod(loop
, p
, fd
, evts
, data
, fn
);
376 static int32_t my_dispatch_del(
379 return qb_loop_poll_del(loop
, fd
);
382 static struct qb_ipcs_service_handlers service_handlers
= {
383 .connection_accept
= s1_connection_accept_fn
,
384 .connection_created
= s1_connection_created_fn
,
385 .msg_process
= s1_msg_process_fn
,
386 .connection_destroyed
= s1_connection_destroyed_fn
,
387 .connection_closed
= s1_connection_closed_fn
,
390 static struct qb_ipcs_poll_handlers poll_handlers
= {
391 .job_add
= my_job_add
,
392 .dispatch_add
= my_dispatch_add
,
393 .dispatch_mod
= my_dispatch_mod
,
394 .dispatch_del
= my_dispatch_del
,
397 static void timer_job(void *data
)
399 gboolean terminate
= FALSE
;
401 g_mutex_lock (&server_started_mutex
);
403 if (terminate_server
) {
404 cfs_debug ("got terminate request");
410 qb_ipcs_destroy (s1
);
415 g_cond_signal (&server_stopped_cond
);
418 } else if (!server_started
) {
420 g_cond_signal (&server_started_cond
);
423 g_mutex_unlock (&server_started_mutex
);
428 qb_loop_timer_handle th
;
429 qb_loop_timer_add(loop
, QB_LOOP_LOW
, 1000000000, NULL
, timer_job
, &th
);
432 static gpointer
worker_thread(gpointer data
)
434 g_return_val_if_fail(loop
!= NULL
, NULL
);
436 cfs_debug("start event loop");
440 qb_loop_timer_handle th
;
441 qb_loop_timer_add(loop
, QB_LOOP_LOW
, 1000, NULL
, timer_job
, &th
);
445 cfs_debug("event loop finished - exit worker thread");
450 gboolean
server_start(memdb_t
*db
)
452 g_return_val_if_fail(loop
== NULL
, FALSE
);
453 g_return_val_if_fail(worker
== NULL
, FALSE
);
454 g_return_val_if_fail(db
!= NULL
, FALSE
);
456 terminate_server
= 0;
461 outbuf
= g_string_sized_new(8192*8);
463 if (!(loop
= qb_loop_create())) {
464 cfs_critical("cant create event loop");
468 s1
= qb_ipcs_create("pve2", 1, QB_IPC_SHM
, &service_handlers
);
470 cfs_critical("qb_ipcs_create failed: %s", strerror(errno
));
473 qb_ipcs_poll_handlers_set(s1
, &poll_handlers
);
475 worker
= g_thread_new ("server", worker_thread
, NULL
);
477 g_mutex_lock (&server_started_mutex
);
478 while (!server_started
)
479 g_cond_wait (&server_started_cond
, &server_started_mutex
);
480 g_mutex_unlock (&server_started_mutex
);
482 cfs_debug("server started");
487 void server_stop(void)
489 cfs_debug("server stop");
491 g_mutex_lock (&server_started_mutex
);
492 terminate_server
= 1;
493 while (server_started
)
494 g_cond_wait (&server_stopped_cond
, &server_started_mutex
);
495 g_mutex_unlock (&server_started_mutex
);
498 g_thread_join(worker
);
502 cfs_debug("worker thread finished");
505 qb_loop_destroy(loop
);
511 g_string_free(outbuf
, TRUE
);