]> git.proxmox.com Git - pve-cluster.git/blob - data/src/server.c
sshinfo: add the network cidr
[pve-cluster.git] / data / src / server.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "ipcs"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdint.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <sys/syslog.h>
31 #include <sys/uio.h>
32
33 #include <qb/qbdefs.h>
34 #include <qb/qbutil.h>
35 #include <qb/qbloop.h>
36 #include <qb/qbipcs.h>
37
38 #include <glib.h>
39
40 #include "cfs-utils.h"
41 #include "status.h"
42 #include "memdb.h"
43 #include "logger.h"
44
45 static GThread *worker;
46 static qb_loop_t *loop;
47 static qb_ipcs_service_t* s1;
48 static GString *outbuf;
49 static memdb_t *memdb;
50
51 static int server_started = 0; /* protect with server_started_mutex */
52 static int terminate_server = 0; /* protect with server_started_mutex */
53 static GCond server_started_cond;
54 static GCond server_stopped_cond;
55 static GMutex server_started_mutex;
56
57
58 typedef struct {
59 struct qb_ipc_request_header req_header;
60 char name[256];
61 } cfs_status_update_request_header_t;
62
63 typedef struct {
64 struct qb_ipc_request_header req_header;
65 char name[256];
66 char nodename[256];
67 } cfs_status_get_request_header_t;
68
69 typedef struct {
70 struct qb_ipc_request_header req_header;
71 uint8_t priority;
72 uint8_t ident_len;
73 uint8_t tag_len;
74 char data[];
75 } cfs_log_msg_request_header_t;
76
77 typedef struct {
78 struct qb_ipc_request_header req_header;
79 uint32_t max_entries;
80 uint32_t res1;
81 uint32_t res2;
82 uint32_t res3;
83 } cfs_log_get_request_header_t;
84
85
86 struct s1_context {
87 int32_t client_pid;
88 uid_t uid;
89 gid_t gid;
90 gboolean read_only;
91 };
92
93 static int32_t s1_connection_accept_fn(
94 qb_ipcs_connection_t *c,
95 uid_t uid,
96 gid_t gid)
97 {
98 if ((uid == 0 && gid == 0) || (gid == cfs.gid)) {
99 cfs_debug("authenticated connection %d/%d", uid, gid);
100 struct s1_context *ctx = g_new0(struct s1_context, 1);
101 ctx->uid = uid;
102 ctx->gid = gid;
103 ctx->read_only = (gid == cfs.gid);
104
105 struct qb_ipcs_connection_stats stats;
106 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
107 ctx->client_pid = stats.client_pid;
108
109 qb_ipcs_context_set(c, ctx);
110 return 0;
111 }
112 cfs_critical("connection from bad user %d! - rejected", uid);
113 return 1;
114 }
115
116 static void s1_connection_created_fn(
117 qb_ipcs_connection_t *c)
118 {
119 struct qb_ipcs_stats srv_stats;
120
121 qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
122
123 cfs_debug("Connection created > active:%d > closed:%d",
124 srv_stats.active_connections,
125 srv_stats.closed_connections);
126 }
127
128 static void s1_connection_destroyed_fn(
129 qb_ipcs_connection_t *c)
130 {
131 cfs_debug("connection about to be freed");
132
133 gpointer ctx;
134 if ((ctx = qb_ipcs_context_get(c)))
135 g_free(ctx);
136
137 }
138
139 static int32_t s1_connection_closed_fn(
140 qb_ipcs_connection_t *c)
141 {
142 struct qb_ipcs_connection_stats stats;
143
144 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
145
146 cfs_debug("Connection to pid:%d destroyed", stats.client_pid);
147
148 return 0;
149 }
150
151 static int32_t s1_msg_process_fn(
152 qb_ipcs_connection_t *c,
153 void *data,
154 size_t size)
155 {
156 struct qb_ipc_request_header *req_pt =
157 (struct qb_ipc_request_header *)data;
158
159 struct s1_context *ctx = (struct s1_context *)qb_ipcs_context_get(c);
160
161 if (!ctx) {
162 cfs_critical("qb_ipcs_context_get failed");
163 qb_ipcs_disconnect(c);
164 return 0;
165 }
166
167 cfs_debug("process msg:%d, size:%d", req_pt->id, req_pt->size);
168
169 char *resp = NULL;
170
171 g_string_truncate(outbuf, 0);
172
173 int32_t result = -ECHRNG;
174 if (req_pt->id == 1) {
175
176 if (req_pt->size != sizeof(struct qb_ipc_request_header)) {
177 result = -EINVAL;
178 } else {
179 result = cfs_create_version_msg(outbuf);
180 }
181
182 } else if (req_pt->id == 2) {
183
184 if (req_pt->size != sizeof(struct qb_ipc_request_header)) {
185 result = -EINVAL;
186 } else {
187 result = cfs_create_memberlist_msg(outbuf);
188 }
189
190 } else if (req_pt->id == 3) {
191
192 if (req_pt->size != sizeof(struct qb_ipc_request_header)) {
193 result = -EINVAL;
194 } else {
195 result = cfs_create_vmlist_msg(outbuf);
196 }
197 } else if (req_pt->id == 4) {
198
199 cfs_status_update_request_header_t *rh =
200 (cfs_status_update_request_header_t *)data;
201
202 int datasize = req_pt->size - sizeof(cfs_status_update_request_header_t);
203
204 if (ctx->read_only) {
205 result = -EPERM;
206 } else if (datasize < 0) {
207 result = -EINVAL;
208 } else {
209 /* make sure name is 0 terminated */
210 rh->name[sizeof(rh->name) - 1] = 0;
211
212 char *dataptr = data + sizeof(cfs_status_update_request_header_t);
213
214 result = cfs_status_set(rh->name, dataptr, datasize);
215 }
216 } else if (req_pt->id == 5) {
217
218 cfs_status_get_request_header_t *rh =
219 (cfs_status_get_request_header_t *)data;
220
221 int datasize = req_pt->size - sizeof(cfs_status_get_request_header_t);
222
223 if (datasize < 0) {
224 result = -EINVAL;
225 } else {
226 /* make sure all names are 0 terminated */
227 rh->name[sizeof(rh->name) - 1] = 0;
228 rh->nodename[sizeof(rh->nodename) - 1] = 0;
229
230 result = cfs_create_status_msg(outbuf, rh->nodename, rh->name);
231 }
232 } else if (req_pt->id == 6) {
233
234 int pathlen = req_pt->size - sizeof(struct qb_ipc_request_header);
235
236 if (pathlen <= 0) {
237 result = -EINVAL;
238 } else {
239 /* make sure path is 0 terminated */
240 ((char *)data)[req_pt->size] = 0;
241 char *path = data + sizeof(struct qb_ipc_request_header);
242
243 if (ctx->read_only && path_is_private(path)) {
244 result = -EPERM;
245 } else {
246 gpointer tmp = NULL;
247 result = memdb_read(memdb, path, &tmp);
248 if (result > 0) {
249 g_string_append_len(outbuf, tmp, result);
250 g_free(tmp);
251 }
252 }
253 }
254 } else if (req_pt->id == 7) {
255
256 cfs_log_msg_request_header_t *rh =
257 (cfs_log_msg_request_header_t *)data;
258
259 int datasize = req_pt->size - G_STRUCT_OFFSET(cfs_log_msg_request_header_t, data);
260 int msg_len = datasize - rh->ident_len - rh->tag_len;
261
262 if (ctx->read_only) {
263 result = -EPERM;
264 } else if (msg_len < 1) {
265 result = -EINVAL;
266 } else {
267 char *msg = rh->data;
268 if ((msg[rh->ident_len - 1] == 0) &&
269 (msg[rh->ident_len + rh->tag_len - 1] == 0) &&
270 (((char *)data)[req_pt->size] == 0)) {
271
272 char *ident = msg;
273 char *tag = msg + rh->ident_len;
274 msg = msg + rh->ident_len + rh->tag_len;
275
276 time_t ctime = time(NULL);
277 clog_entry_t *entry = (clog_entry_t *)alloca(CLOG_MAX_ENTRY_SIZE);
278 if (clog_pack(entry, cfs.nodename, ident, tag, ctx->client_pid,
279 ctime, rh->priority, msg)) {
280 cfs_cluster_log(entry);
281 }
282
283 result = 0;
284
285 } else {
286 result = -EINVAL;
287 }
288 }
289 } else if (req_pt->id == 8) {
290
291 cfs_log_get_request_header_t *rh =
292 (cfs_log_get_request_header_t *)data;
293
294 int userlen = req_pt->size - sizeof(cfs_log_get_request_header_t);
295
296 if (userlen <= 0) {
297 result = -EINVAL;
298 } else {
299 /* make sure user string is 0 terminated */
300 ((char *)data)[req_pt->size] = 0;
301 char *user = data + sizeof(cfs_log_get_request_header_t);
302
303 uint32_t max = rh->max_entries ? rh->max_entries : 50;
304 cfs_cluster_log_dump(outbuf, user, max);
305 result = 0;
306 }
307 } else if (req_pt->id == 10) {
308
309 if (req_pt->size != sizeof(struct qb_ipc_request_header)) {
310 result = -EINVAL;
311 } else {
312 cfs_rrd_dump(outbuf);
313 result = 0;
314 }
315 }
316
317 cfs_debug("process result %d", result);
318
319 if (result >= 0) {
320 resp = outbuf->str;
321 result = 0;
322 }
323
324 int iov_len = 2;
325 struct iovec iov[iov_len];
326 struct qb_ipc_response_header res_header;
327
328 int resp_data_len = resp ? outbuf->len : 0;
329
330 res_header.id = req_pt->id;
331 res_header.size = sizeof(res_header) + resp_data_len;
332 res_header.error = result;
333
334 iov[0].iov_base = (char *)&res_header;
335 iov[0].iov_len = sizeof(res_header);
336 iov[1].iov_base = resp;
337 iov[1].iov_len = resp_data_len;
338
339 ssize_t res = qb_ipcs_response_sendv(c, iov, iov_len);
340 if (res < 0) {
341 cfs_critical("qb_ipcs_response_send: %s", strerror(errno));
342 qb_ipcs_disconnect(c);
343 }
344
345 return 0;
346 }
347
348 static int32_t my_job_add(
349 enum qb_loop_priority p,
350 void *data,
351 qb_loop_job_dispatch_fn fn)
352 {
353 return qb_loop_job_add(loop, p, data, fn);
354 }
355
356 static int32_t my_dispatch_add(
357 enum qb_loop_priority p,
358 int32_t fd,
359 int32_t evts,
360 void *data,
361 qb_ipcs_dispatch_fn_t fn)
362 {
363 return qb_loop_poll_add(loop, p, fd, evts, data, fn);
364 }
365
366 static int32_t my_dispatch_mod(
367 enum qb_loop_priority p,
368 int32_t fd,
369 int32_t evts,
370 void *data,
371 qb_ipcs_dispatch_fn_t fn)
372 {
373 return qb_loop_poll_mod(loop, p, fd, evts, data, fn);
374 }
375
376 static int32_t my_dispatch_del(
377 int32_t fd)
378 {
379 return qb_loop_poll_del(loop, fd);
380 }
381
382 static struct qb_ipcs_service_handlers service_handlers = {
383 .connection_accept = s1_connection_accept_fn,
384 .connection_created = s1_connection_created_fn,
385 .msg_process = s1_msg_process_fn,
386 .connection_destroyed = s1_connection_destroyed_fn,
387 .connection_closed = s1_connection_closed_fn,
388 };
389
390 static struct qb_ipcs_poll_handlers poll_handlers = {
391 .job_add = my_job_add,
392 .dispatch_add = my_dispatch_add,
393 .dispatch_mod = my_dispatch_mod,
394 .dispatch_del = my_dispatch_del,
395 };
396
397 static void timer_job(void *data)
398 {
399 gboolean terminate = FALSE;
400
401 g_mutex_lock (&server_started_mutex);
402
403 if (terminate_server) {
404 cfs_debug ("got terminate request");
405
406 if (loop)
407 qb_loop_stop (loop);
408
409 if (s1) {
410 qb_ipcs_destroy (s1);
411 s1 = 0;
412 }
413 server_started = 0;
414
415 g_cond_signal (&server_stopped_cond);
416
417 terminate = TRUE;
418 } else if (!server_started) {
419 server_started = 1;
420 g_cond_signal (&server_started_cond);
421 }
422
423 g_mutex_unlock (&server_started_mutex);
424
425 if (terminate)
426 return;
427
428 qb_loop_timer_handle th;
429 qb_loop_timer_add(loop, QB_LOOP_LOW, 1000000000, NULL, timer_job, &th);
430 }
431
432 static gpointer worker_thread(gpointer data)
433 {
434 g_return_val_if_fail(loop != NULL, NULL);
435
436 cfs_debug("start event loop");
437
438 qb_ipcs_run(s1);
439
440 qb_loop_timer_handle th;
441 qb_loop_timer_add(loop, QB_LOOP_LOW, 1000, NULL, timer_job, &th);
442
443 qb_loop_run(loop);
444
445 cfs_debug("event loop finished - exit worker thread");
446
447 return NULL;
448 }
449
450 gboolean server_start(memdb_t *db)
451 {
452 g_return_val_if_fail(loop == NULL, FALSE);
453 g_return_val_if_fail(worker == NULL, FALSE);
454 g_return_val_if_fail(db != NULL, FALSE);
455
456 terminate_server = 0;
457 server_started = 0;
458
459 memdb = db;
460
461 outbuf = g_string_sized_new(8192*8);
462
463 if (!(loop = qb_loop_create())) {
464 cfs_critical("cant create event loop");
465 return FALSE;
466 }
467
468 s1 = qb_ipcs_create("pve2", 1, QB_IPC_SHM, &service_handlers);
469 if (s1 == 0) {
470 cfs_critical("qb_ipcs_create failed: %s", strerror(errno));
471 return FALSE;
472 }
473 qb_ipcs_poll_handlers_set(s1, &poll_handlers);
474
475 worker = g_thread_new ("server", worker_thread, NULL);
476
477 g_mutex_lock (&server_started_mutex);
478 while (!server_started)
479 g_cond_wait (&server_started_cond, &server_started_mutex);
480 g_mutex_unlock (&server_started_mutex);
481
482 cfs_debug("server started");
483
484 return TRUE;
485 }
486
487 void server_stop(void)
488 {
489 cfs_debug("server stop");
490
491 g_mutex_lock (&server_started_mutex);
492 terminate_server = 1;
493 while (server_started)
494 g_cond_wait (&server_stopped_cond, &server_started_mutex);
495 g_mutex_unlock (&server_started_mutex);
496
497 if (worker) {
498 g_thread_join(worker);
499 worker = NULL;
500 }
501
502 cfs_debug("worker thread finished");
503
504 if (loop) {
505 qb_loop_destroy(loop);
506
507 loop = NULL;
508 }
509
510 if (outbuf) {
511 g_string_free(outbuf, TRUE);
512 outbuf = NULL;
513 }
514 }