2 Copyright (C) 2010 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
21 #define G_LOG_DOMAIN "ipcs"
25 #endif /* HAVE_CONFIG_H */
30 #include <sys/syslog.h>
33 #include <qb/qbdefs.h>
34 #include <qb/qbutil.h>
35 #include <qb/qbloop.h>
36 #include <qb/qbipcs.h>
40 #include "cfs-utils.h"
41 #include "cfs-ipc-ops.h"
46 static GThread
*worker
;
47 static qb_loop_t
*loop
;
48 static qb_ipcs_service_t
* s1
;
49 static GString
*outbuf
;
50 static memdb_t
*memdb
;
52 static int server_started
= 0; /* protect with server_started_mutex */
53 static int terminate_server
= 0; /* protect with server_started_mutex */
54 static GCond server_started_cond
;
55 static GCond server_stopped_cond
;
56 static GMutex server_started_mutex
;
60 struct qb_ipc_request_header req_header
;
62 } cfs_status_update_request_header_t
;
65 struct qb_ipc_request_header req_header
;
68 } cfs_status_get_request_header_t
;
71 struct qb_ipc_request_header req_header
;
76 } cfs_log_msg_request_header_t
;
79 struct qb_ipc_request_header req_header
;
84 } cfs_log_get_request_header_t
;
87 struct qb_ipc_request_header req_header
;
90 } cfs_guest_config_propery_get_request_header_t
;
93 struct qb_ipc_request_header req_header
;
95 } cfs_verify_token_request_header_t
;
104 static int32_t s1_connection_accept_fn(
105 qb_ipcs_connection_t
*c
,
109 if ((uid
== 0 && gid
== 0) || (gid
== cfs
.gid
)) {
110 cfs_debug("authenticated connection %d/%d", uid
, gid
);
111 struct s1_context
*ctx
= g_new0(struct s1_context
, 1);
114 ctx
->read_only
= (gid
== cfs
.gid
);
116 struct qb_ipcs_connection_stats stats
;
117 qb_ipcs_connection_stats_get(c
, &stats
, QB_FALSE
);
118 ctx
->client_pid
= stats
.client_pid
;
120 qb_ipcs_context_set(c
, ctx
);
123 cfs_critical("connection from bad user %d! - rejected", uid
);
127 static void s1_connection_created_fn(
128 qb_ipcs_connection_t
*c
)
130 struct qb_ipcs_stats srv_stats
;
132 qb_ipcs_stats_get(s1
, &srv_stats
, QB_FALSE
);
134 cfs_debug("Connection created > active:%d > closed:%d",
135 srv_stats
.active_connections
,
136 srv_stats
.closed_connections
);
139 static void s1_connection_destroyed_fn(
140 qb_ipcs_connection_t
*c
)
142 cfs_debug("connection about to be freed");
145 if ((ctx
= qb_ipcs_context_get(c
)))
150 static int32_t s1_connection_closed_fn(
151 qb_ipcs_connection_t
*c
)
153 struct qb_ipcs_connection_stats stats
;
155 qb_ipcs_connection_stats_get(c
, &stats
, QB_FALSE
);
157 cfs_debug("Connection to pid:%d destroyed", stats
.client_pid
);
162 static int32_t s1_msg_process_fn(
163 qb_ipcs_connection_t
*c
,
167 struct qb_ipc_request_header
*req_pt
=
168 (struct qb_ipc_request_header
*)data
;
170 struct s1_context
*ctx
= (struct s1_context
*)qb_ipcs_context_get(c
);
173 cfs_critical("qb_ipcs_context_get failed");
174 qb_ipcs_disconnect(c
);
178 int32_t request_id
__attribute__ ((aligned(8))) = req_pt
->id
;
179 int32_t request_size
__attribute__ ((aligned(8))) = req_pt
->size
;
180 cfs_debug("process msg:%d, size:%d", request_id
, request_size
);
184 g_string_truncate(outbuf
, 0);
186 int32_t result
= -ECHRNG
;
187 if (request_id
== CFS_IPC_GET_FS_VERSION
) {
189 if (request_size
!= sizeof(struct qb_ipc_request_header
)) {
192 result
= cfs_create_version_msg(outbuf
);
195 } else if (request_id
== CFS_IPC_GET_CLUSTER_INFO
) {
197 if (request_size
!= sizeof(struct qb_ipc_request_header
)) {
200 result
= cfs_create_memberlist_msg(outbuf
);
203 } else if (request_id
== CFS_IPC_GET_GUEST_LIST
) {
205 if (request_size
!= sizeof(struct qb_ipc_request_header
)) {
208 result
= cfs_create_vmlist_msg(outbuf
);
210 } else if (request_id
== CFS_IPC_SET_STATUS
) {
212 cfs_status_update_request_header_t
*rh
=
213 (cfs_status_update_request_header_t
*)data
;
215 int datasize
= request_size
- sizeof(cfs_status_update_request_header_t
);
217 if (ctx
->read_only
) {
219 } else if (datasize
< 0) {
222 /* make sure name is 0 terminated */
223 rh
->name
[sizeof(rh
->name
) - 1] = 0;
225 char *dataptr
= (char*) data
+ sizeof(cfs_status_update_request_header_t
);
227 result
= cfs_status_set(rh
->name
, dataptr
, datasize
);
229 } else if (request_id
== CFS_IPC_GET_STATUS
) {
231 cfs_status_get_request_header_t
*rh
=
232 (cfs_status_get_request_header_t
*)data
;
234 int datasize
= request_size
- sizeof(cfs_status_get_request_header_t
);
239 /* make sure all names are 0 terminated */
240 rh
->name
[sizeof(rh
->name
) - 1] = 0;
241 rh
->nodename
[sizeof(rh
->nodename
) - 1] = 0;
243 result
= cfs_create_status_msg(outbuf
, rh
->nodename
, rh
->name
);
245 } else if (request_id
== CFS_IPC_GET_CONFIG
) {
247 int pathlen
= request_size
- sizeof(struct qb_ipc_request_header
);
252 /* make sure path is 0 terminated */
253 ((char *)data
)[request_size
- 1] = 0;
254 char *path
= (char*) data
+ sizeof(struct qb_ipc_request_header
);
256 if (ctx
->read_only
&& path_is_private(path
)) {
260 result
= memdb_read(memdb
, path
, &tmp
);
262 g_string_append_len(outbuf
, tmp
, result
);
267 } else if (request_id
== CFS_IPC_LOG_CLUSTER_MSG
) {
269 cfs_log_msg_request_header_t
*rh
=
270 (cfs_log_msg_request_header_t
*)data
;
272 int datasize
= request_size
- G_STRUCT_OFFSET(cfs_log_msg_request_header_t
, data
);
273 int msg_len
= datasize
- rh
->ident_len
- rh
->tag_len
;
275 if (ctx
->read_only
) {
277 } else if (msg_len
< 1) {
280 char *msg
= rh
->data
;
281 if ((msg
[rh
->ident_len
- 1] == 0) &&
282 (msg
[rh
->ident_len
+ rh
->tag_len
- 1] == 0) &&
283 (((char *)data
)[request_size
] == 0)) {
286 char *tag
= msg
+ rh
->ident_len
;
287 msg
= msg
+ rh
->ident_len
+ rh
->tag_len
;
289 time_t ctime
= time(NULL
);
290 clog_entry_t
*entry
= (clog_entry_t
*)alloca(CLOG_MAX_ENTRY_SIZE
);
291 if (clog_pack(entry
, cfs
.nodename
, ident
, tag
, ctx
->client_pid
,
292 ctime
, rh
->priority
, msg
)) {
293 cfs_cluster_log(entry
);
302 } else if (request_id
== CFS_IPC_GET_CLUSTER_LOG
) {
304 cfs_log_get_request_header_t
*rh
=
305 (cfs_log_get_request_header_t
*)data
;
307 int userlen
= request_size
- sizeof(cfs_log_get_request_header_t
);
312 /* make sure user string is 0 terminated */
313 ((char *)data
)[request_size
- 1] = 0;
314 char *user
= (char*) data
+ sizeof(cfs_log_get_request_header_t
);
316 uint32_t max
= rh
->max_entries
? rh
->max_entries
: 50;
317 cfs_cluster_log_dump(outbuf
, user
, max
);
320 } else if (request_id
== CFS_IPC_GET_RRD_DUMP
) {
322 if (request_size
!= sizeof(struct qb_ipc_request_header
)) {
325 cfs_rrd_dump(outbuf
);
328 } else if (request_id
== CFS_IPC_GET_GUEST_CONFIG_PROPERTY
) {
330 cfs_guest_config_propery_get_request_header_t
*rh
=
331 (cfs_guest_config_propery_get_request_header_t
*) data
;
333 int proplen
= request_size
- G_STRUCT_OFFSET(cfs_guest_config_propery_get_request_header_t
, property
);
336 if (rh
->vmid
< 100 && rh
->vmid
!= 0) {
337 cfs_debug("vmid out of range %u", rh
->vmid
);
339 } else if (rh
->vmid
>= 100 && !vmlist_vm_exists(rh
->vmid
)) {
341 } else if (proplen
<= 0) {
342 cfs_debug("proplen <= 0, %d", proplen
);
345 ((char *)data
)[request_size
- 1] = 0; // ensure property is 0 terminated
347 cfs_debug("cfs_get_guest_config_property: basic valid checked, do request");
349 result
= cfs_create_guest_conf_property_msg(outbuf
, memdb
, rh
->property
, rh
->vmid
);
351 } else if (request_id
== CFS_IPC_VERIFY_TOKEN
) {
353 cfs_verify_token_request_header_t
*rh
= (cfs_verify_token_request_header_t
*) data
;
354 int tokenlen
= request_size
- G_STRUCT_OFFSET(cfs_verify_token_request_header_t
, token
) - 1;
357 cfs_debug("tokenlen <= 0, %d", tokenlen
);
359 } else if (memchr(rh
->token
, '\n', tokenlen
) != NULL
) {
360 cfs_debug("token contains newline");
362 } else if (rh
->token
[tokenlen
] != '\0') {
363 cfs_debug("token not NULL-terminated");
365 } else if (strnlen(rh
->token
, tokenlen
) != tokenlen
) {
366 cfs_debug("token contains NULL-byte");
369 cfs_debug("cfs_verify_token: basic validity checked, reading token.cfg");
371 int bytes_read
= memdb_read(memdb
, "priv/token.cfg", &tmp
);
372 size_t remaining
= bytes_read
> 0 ? bytes_read
: 0;
373 if (tmp
!= NULL
&& remaining
>= tokenlen
) {
374 const char *line
= (char *) tmp
;
375 const char *next_line
;
376 const char *const end
= line
+ remaining
;
379 while (line
!= NULL
) {
380 next_line
= memchr(line
, '\n', remaining
);
381 linelen
= next_line
== NULL
? remaining
: next_line
- line
;
382 if (linelen
== tokenlen
&& strncmp(line
, rh
->token
, linelen
) == 0) {
389 remaining
= end
- line
;
397 cfs_debug("token: token.cfg does not exist - ENOENT");
403 cfs_debug("process result %d", result
);
411 struct iovec iov
[iov_len
];
412 struct qb_ipc_response_header res_header
;
414 int resp_data_len
= resp
? outbuf
->len
: 0;
416 res_header
.id
= request_id
;
417 res_header
.size
= sizeof(res_header
) + resp_data_len
;
418 res_header
.error
= result
;
420 iov
[0].iov_base
= (char *)&res_header
;
421 iov
[0].iov_len
= sizeof(res_header
);
422 iov
[1].iov_base
= resp
;
423 iov
[1].iov_len
= resp_data_len
;
425 ssize_t res
= qb_ipcs_response_sendv(c
, iov
, iov_len
);
427 cfs_critical("qb_ipcs_response_send: %s", strerror(errno
));
428 qb_ipcs_disconnect(c
);
434 static int32_t my_job_add(
435 enum qb_loop_priority p
,
437 qb_loop_job_dispatch_fn fn
)
439 return qb_loop_job_add(loop
, p
, data
, fn
);
442 static int32_t my_dispatch_add(
443 enum qb_loop_priority p
,
447 qb_ipcs_dispatch_fn_t fn
)
449 return qb_loop_poll_add(loop
, p
, fd
, evts
, data
, fn
);
452 static int32_t my_dispatch_mod(
453 enum qb_loop_priority p
,
457 qb_ipcs_dispatch_fn_t fn
)
459 return qb_loop_poll_mod(loop
, p
, fd
, evts
, data
, fn
);
462 static int32_t my_dispatch_del(
465 return qb_loop_poll_del(loop
, fd
);
468 static struct qb_ipcs_service_handlers service_handlers
= {
469 .connection_accept
= s1_connection_accept_fn
,
470 .connection_created
= s1_connection_created_fn
,
471 .msg_process
= s1_msg_process_fn
,
472 .connection_destroyed
= s1_connection_destroyed_fn
,
473 .connection_closed
= s1_connection_closed_fn
,
476 static struct qb_ipcs_poll_handlers poll_handlers
= {
477 .job_add
= my_job_add
,
478 .dispatch_add
= my_dispatch_add
,
479 .dispatch_mod
= my_dispatch_mod
,
480 .dispatch_del
= my_dispatch_del
,
483 static void timer_job(void *data
)
485 gboolean terminate
= FALSE
;
487 g_mutex_lock (&server_started_mutex
);
489 if (terminate_server
) {
490 cfs_debug ("got terminate request");
496 qb_ipcs_destroy (s1
);
501 g_cond_signal (&server_stopped_cond
);
504 } else if (!server_started
) {
506 g_cond_signal (&server_started_cond
);
509 g_mutex_unlock (&server_started_mutex
);
514 qb_loop_timer_handle th
;
515 qb_loop_timer_add(loop
, QB_LOOP_LOW
, 1000000000, NULL
, timer_job
, &th
);
518 static gpointer
worker_thread(gpointer data
)
520 g_return_val_if_fail(loop
!= NULL
, NULL
);
522 cfs_debug("start event loop");
526 qb_loop_timer_handle th
;
527 qb_loop_timer_add(loop
, QB_LOOP_LOW
, 1000, NULL
, timer_job
, &th
);
531 cfs_debug("event loop finished - exit worker thread");
536 gboolean
server_start(memdb_t
*db
)
538 g_return_val_if_fail(loop
== NULL
, FALSE
);
539 g_return_val_if_fail(worker
== NULL
, FALSE
);
540 g_return_val_if_fail(db
!= NULL
, FALSE
);
542 terminate_server
= 0;
547 outbuf
= g_string_sized_new(8192*8);
549 if (!(loop
= qb_loop_create())) {
550 cfs_critical("cant create event loop");
554 s1
= qb_ipcs_create("pve2", 1, QB_IPC_SHM
, &service_handlers
);
556 cfs_critical("qb_ipcs_create failed: %s", strerror(errno
));
559 qb_ipcs_poll_handlers_set(s1
, &poll_handlers
);
561 worker
= g_thread_new ("server", worker_thread
, NULL
);
563 g_mutex_lock (&server_started_mutex
);
564 while (!server_started
)
565 g_cond_wait (&server_started_cond
, &server_started_mutex
);
566 g_mutex_unlock (&server_started_mutex
);
568 cfs_debug("server started");
573 void server_stop(void)
575 cfs_debug("server stop");
577 g_mutex_lock (&server_started_mutex
);
578 terminate_server
= 1;
579 while (server_started
)
580 g_cond_wait (&server_stopped_cond
, &server_started_mutex
);
581 g_mutex_unlock (&server_started_mutex
);
584 g_thread_join(worker
);
588 cfs_debug("worker thread finished");
591 qb_loop_destroy(loop
);
597 g_string_free(outbuf
, TRUE
);