]> git.proxmox.com Git - systemd.git/blob - src/journal/journald-stream.c
Imported Upstream version 220
[systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <unistd.h>
23 #include <stddef.h>
24
25 #ifdef HAVE_SELINUX
26 #include <selinux/selinux.h>
27 #endif
28
29 #include "sd-event.h"
30 #include "sd-daemon.h"
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "mkdir.h"
34 #include "fileio.h"
35 #include "journald-server.h"
36 #include "journald-stream.h"
37 #include "journald-syslog.h"
38 #include "journald-kmsg.h"
39 #include "journald-console.h"
40 #include "journald-wall.h"
41
42 #define STDOUT_STREAMS_MAX 4096
43
44 typedef enum StdoutStreamState {
45 STDOUT_STREAM_IDENTIFIER,
46 STDOUT_STREAM_UNIT_ID,
47 STDOUT_STREAM_PRIORITY,
48 STDOUT_STREAM_LEVEL_PREFIX,
49 STDOUT_STREAM_FORWARD_TO_SYSLOG,
50 STDOUT_STREAM_FORWARD_TO_KMSG,
51 STDOUT_STREAM_FORWARD_TO_CONSOLE,
52 STDOUT_STREAM_RUNNING
53 } StdoutStreamState;
54
55 struct StdoutStream {
56 Server *server;
57 StdoutStreamState state;
58
59 int fd;
60
61 struct ucred ucred;
62 #ifdef HAVE_SELINUX
63 security_context_t security_context;
64 #endif
65
66 char *identifier;
67 char *unit_id;
68 int priority;
69 bool level_prefix:1;
70 bool forward_to_syslog:1;
71 bool forward_to_kmsg:1;
72 bool forward_to_console:1;
73
74 bool fdstore:1;
75
76 char buffer[LINE_MAX+1];
77 size_t length;
78
79 sd_event_source *event_source;
80
81 char *state_file;
82
83 LIST_FIELDS(StdoutStream, stdout_stream);
84 };
85
86 void stdout_stream_free(StdoutStream *s) {
87 if (!s)
88 return;
89
90 if (s->server) {
91 assert(s->server->n_stdout_streams > 0);
92 s->server->n_stdout_streams --;
93 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
94 }
95
96 if (s->event_source) {
97 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
98 s->event_source = sd_event_source_unref(s->event_source);
99 }
100
101 safe_close(s->fd);
102
103 #ifdef HAVE_SELINUX
104 if (s->security_context)
105 freecon(s->security_context);
106 #endif
107
108 free(s->identifier);
109 free(s->unit_id);
110 free(s->state_file);
111
112 free(s);
113 }
114
115 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
116
117 static void stdout_stream_destroy(StdoutStream *s) {
118 if (!s)
119 return;
120
121 if (s->state_file)
122 unlink(s->state_file);
123
124 stdout_stream_free(s);
125 }
126
127 static int stdout_stream_save(StdoutStream *s) {
128 _cleanup_free_ char *temp_path = NULL;
129 _cleanup_fclose_ FILE *f = NULL;
130 int r;
131
132 assert(s);
133
134 if (s->state != STDOUT_STREAM_RUNNING)
135 return 0;
136
137 if (!s->state_file) {
138 struct stat st;
139
140 r = fstat(s->fd, &st);
141 if (r < 0)
142 return log_warning_errno(errno, "Failed to stat connected stream: %m");
143
144 /* We use device and inode numbers as identifier for the stream */
145 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
146 return log_oom();
147 }
148
149 mkdir_p("/run/systemd/journal/streams", 0755);
150
151 r = fopen_temporary(s->state_file, &f, &temp_path);
152 if (r < 0)
153 goto finish;
154
155 fprintf(f,
156 "# This is private data. Do not parse\n"
157 "PRIORITY=%i\n"
158 "LEVEL_PREFIX=%i\n"
159 "FORWARD_TO_SYSLOG=%i\n"
160 "FORWARD_TO_KMSG=%i\n"
161 "FORWARD_TO_CONSOLE=%i\n",
162 s->priority,
163 s->level_prefix,
164 s->forward_to_syslog,
165 s->forward_to_kmsg,
166 s->forward_to_console);
167
168 if (!isempty(s->identifier)) {
169 _cleanup_free_ char *escaped;
170
171 escaped = cescape(s->identifier);
172 if (!escaped) {
173 r = -ENOMEM;
174 goto finish;
175 }
176
177 fprintf(f, "IDENTIFIER=%s\n", escaped);
178 }
179
180 if (!isempty(s->unit_id)) {
181 _cleanup_free_ char *escaped;
182
183 escaped = cescape(s->unit_id);
184 if (!escaped) {
185 r = -ENOMEM;
186 goto finish;
187 }
188
189 fprintf(f, "UNIT=%s\n", escaped);
190 }
191
192 r = fflush_and_check(f);
193 if (r < 0)
194 goto finish;
195
196 if (rename(temp_path, s->state_file) < 0) {
197 r = -errno;
198 goto finish;
199 }
200
201 free(temp_path);
202 temp_path = NULL;
203
204 /* Store the connection fd in PID 1, so that we get it passed
205 * in again on next start */
206 if (!s->fdstore) {
207 sd_pid_notify_with_fds(0, false, "FDSTORE=1", &s->fd, 1);
208 s->fdstore = true;
209 }
210
211 finish:
212 if (temp_path)
213 unlink(temp_path);
214
215 if (r < 0)
216 log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
217
218 return r;
219 }
220
221 static int stdout_stream_log(StdoutStream *s, const char *p) {
222 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
223 int priority;
224 char syslog_priority[] = "PRIORITY=\0";
225 char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
226 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
227 unsigned n = 0;
228 char *label = NULL;
229 size_t label_len = 0;
230
231 assert(s);
232 assert(p);
233
234 if (isempty(p))
235 return 0;
236
237 priority = s->priority;
238
239 if (s->level_prefix)
240 syslog_parse_priority(&p, &priority, false);
241
242 if (s->forward_to_syslog || s->server->forward_to_syslog)
243 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
244
245 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
246 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
247
248 if (s->forward_to_console || s->server->forward_to_console)
249 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
250
251 if (s->server->forward_to_wall)
252 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
253
254 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
255
256 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
257 IOVEC_SET_STRING(iovec[n++], syslog_priority);
258
259 if (priority & LOG_FACMASK) {
260 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
261 IOVEC_SET_STRING(iovec[n++], syslog_facility);
262 }
263
264 if (s->identifier) {
265 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
266 if (syslog_identifier)
267 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
268 }
269
270 message = strappend("MESSAGE=", p);
271 if (message)
272 IOVEC_SET_STRING(iovec[n++], message);
273
274 #ifdef HAVE_SELINUX
275 if (s->security_context) {
276 label = (char*) s->security_context;
277 label_len = strlen((char*) s->security_context);
278 }
279 #endif
280
281 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, label, label_len, s->unit_id, priority, 0);
282 return 0;
283 }
284
285 static int stdout_stream_line(StdoutStream *s, char *p) {
286 int r;
287
288 assert(s);
289 assert(p);
290
291 p = strstrip(p);
292
293 switch (s->state) {
294
295 case STDOUT_STREAM_IDENTIFIER:
296 if (isempty(p))
297 s->identifier = NULL;
298 else {
299 s->identifier = strdup(p);
300 if (!s->identifier)
301 return log_oom();
302 }
303
304 s->state = STDOUT_STREAM_UNIT_ID;
305 return 0;
306
307 case STDOUT_STREAM_UNIT_ID:
308 if (s->ucred.uid == 0) {
309 if (isempty(p))
310 s->unit_id = NULL;
311 else {
312 s->unit_id = strdup(p);
313 if (!s->unit_id)
314 return log_oom();
315 }
316 }
317
318 s->state = STDOUT_STREAM_PRIORITY;
319 return 0;
320
321 case STDOUT_STREAM_PRIORITY:
322 r = safe_atoi(p, &s->priority);
323 if (r < 0 || s->priority < 0 || s->priority > 999) {
324 log_warning("Failed to parse log priority line.");
325 return -EINVAL;
326 }
327
328 s->state = STDOUT_STREAM_LEVEL_PREFIX;
329 return 0;
330
331 case STDOUT_STREAM_LEVEL_PREFIX:
332 r = parse_boolean(p);
333 if (r < 0) {
334 log_warning("Failed to parse level prefix line.");
335 return -EINVAL;
336 }
337
338 s->level_prefix = !!r;
339 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
340 return 0;
341
342 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
343 r = parse_boolean(p);
344 if (r < 0) {
345 log_warning("Failed to parse forward to syslog line.");
346 return -EINVAL;
347 }
348
349 s->forward_to_syslog = !!r;
350 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
351 return 0;
352
353 case STDOUT_STREAM_FORWARD_TO_KMSG:
354 r = parse_boolean(p);
355 if (r < 0) {
356 log_warning("Failed to parse copy to kmsg line.");
357 return -EINVAL;
358 }
359
360 s->forward_to_kmsg = !!r;
361 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
362 return 0;
363
364 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
365 r = parse_boolean(p);
366 if (r < 0) {
367 log_warning("Failed to parse copy to console line.");
368 return -EINVAL;
369 }
370
371 s->forward_to_console = !!r;
372 s->state = STDOUT_STREAM_RUNNING;
373
374 /* Try to save the stream, so that journald can be restarted and we can recover */
375 (void) stdout_stream_save(s);
376 return 0;
377
378 case STDOUT_STREAM_RUNNING:
379 return stdout_stream_log(s, p);
380 }
381
382 assert_not_reached("Unknown stream state");
383 }
384
385 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
386 char *p;
387 size_t remaining;
388 int r;
389
390 assert(s);
391
392 p = s->buffer;
393 remaining = s->length;
394 for (;;) {
395 char *end;
396 size_t skip;
397
398 end = memchr(p, '\n', remaining);
399 if (end)
400 skip = end - p + 1;
401 else if (remaining >= sizeof(s->buffer) - 1) {
402 end = p + sizeof(s->buffer) - 1;
403 skip = remaining;
404 } else
405 break;
406
407 *end = 0;
408
409 r = stdout_stream_line(s, p);
410 if (r < 0)
411 return r;
412
413 remaining -= skip;
414 p += skip;
415 }
416
417 if (force_flush && remaining > 0) {
418 p[remaining] = 0;
419 r = stdout_stream_line(s, p);
420 if (r < 0)
421 return r;
422
423 p += remaining;
424 remaining = 0;
425 }
426
427 if (p > s->buffer) {
428 memmove(s->buffer, p, remaining);
429 s->length = remaining;
430 }
431
432 return 0;
433 }
434
435 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
436 StdoutStream *s = userdata;
437 ssize_t l;
438 int r;
439
440 assert(s);
441
442 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
443 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
444 goto terminate;
445 }
446
447 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
448 if (l < 0) {
449
450 if (errno == EAGAIN)
451 return 0;
452
453 log_warning_errno(errno, "Failed to read from stream: %m");
454 goto terminate;
455 }
456
457 if (l == 0) {
458 stdout_stream_scan(s, true);
459 goto terminate;
460 }
461
462 s->length += l;
463 r = stdout_stream_scan(s, false);
464 if (r < 0)
465 goto terminate;
466
467 return 1;
468
469 terminate:
470 stdout_stream_destroy(s);
471 return 0;
472 }
473
474 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
475 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
476 int r;
477
478 assert(s);
479 assert(fd >= 0);
480
481 stream = new0(StdoutStream, 1);
482 if (!stream)
483 return log_oom();
484
485 stream->fd = -1;
486 stream->priority = LOG_INFO;
487
488 r = getpeercred(fd, &stream->ucred);
489 if (r < 0)
490 return log_error_errno(r, "Failed to determine peer credentials: %m");
491
492 #ifdef HAVE_SELINUX
493 if (mac_selinux_use()) {
494 if (getpeercon(fd, &stream->security_context) < 0 && errno != ENOPROTOOPT)
495 log_error_errno(errno, "Failed to determine peer security context: %m");
496 }
497 #endif
498
499 (void) shutdown(fd, SHUT_WR);
500
501 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
502 if (r < 0)
503 return log_error_errno(r, "Failed to add stream to event loop: %m");
504
505 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
506 if (r < 0)
507 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
508
509 stream->fd = fd;
510
511 stream->server = s;
512 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
513 s->n_stdout_streams ++;
514
515 if (ret)
516 *ret = stream;
517
518 stream = NULL;
519
520 return 0;
521 }
522
523 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
524 _cleanup_close_ int fd = -1;
525 Server *s = userdata;
526 int r;
527
528 assert(s);
529
530 if (revents != EPOLLIN) {
531 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
532 return -EIO;
533 }
534
535 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
536 if (fd < 0) {
537 if (errno == EAGAIN)
538 return 0;
539
540 log_error_errno(errno, "Failed to accept stdout connection: %m");
541 return -errno;
542 }
543
544 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
545 log_warning("Too many stdout streams, refusing connection.");
546 return 0;
547 }
548
549 r = stdout_stream_install(s, fd, NULL);
550 if (r < 0)
551 return r;
552
553 fd = -1;
554 return 0;
555 }
556
557 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
558 _cleanup_free_ char
559 *priority = NULL,
560 *level_prefix = NULL,
561 *forward_to_syslog = NULL,
562 *forward_to_kmsg = NULL,
563 *forward_to_console = NULL;
564 int r;
565
566 assert(stream);
567 assert(fname);
568
569 if (!stream->state_file) {
570 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
571 if (!stream->state_file)
572 return log_oom();
573 }
574
575 r = parse_env_file(stream->state_file, NEWLINE,
576 "PRIORITY", &priority,
577 "LEVEL_PREFIX", &level_prefix,
578 "FORWARD_TO_SYSLOG", &forward_to_syslog,
579 "FORWARD_TO_KMSG", &forward_to_kmsg,
580 "FORWARD_TO_CONSOLE", &forward_to_console,
581 "IDENTIFIER", &stream->identifier,
582 "UNIT", &stream->unit_id,
583 NULL);
584 if (r < 0)
585 return log_error_errno(r, "Failed to read: %s", stream->state_file);
586
587 if (priority) {
588 int p;
589
590 p = log_level_from_string(priority);
591 if (p >= 0)
592 stream->priority = p;
593 }
594
595 if (level_prefix) {
596 r = parse_boolean(level_prefix);
597 if (r >= 0)
598 stream->level_prefix = r;
599 }
600
601 if (forward_to_syslog) {
602 r = parse_boolean(forward_to_syslog);
603 if (r >= 0)
604 stream->forward_to_syslog = r;
605 }
606
607 if (forward_to_kmsg) {
608 r = parse_boolean(forward_to_kmsg);
609 if (r >= 0)
610 stream->forward_to_kmsg = r;
611 }
612
613 if (forward_to_console) {
614 r = parse_boolean(forward_to_console);
615 if (r >= 0)
616 stream->forward_to_console = r;
617 }
618
619 return 0;
620 }
621
622 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
623 StdoutStream *stream;
624 int r;
625
626 assert(s);
627 assert(fname);
628 assert(fd >= 0);
629
630 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
631 log_warning("Too many stdout streams, refusing restoring of stream.");
632 return -ENOBUFS;
633 }
634
635 r = stdout_stream_install(s, fd, &stream);
636 if (r < 0)
637 return r;
638
639 stream->state = STDOUT_STREAM_RUNNING;
640 stream->fdstore = true;
641
642 /* Ignore all parsing errors */
643 (void) stdout_stream_load(stream, fname);
644
645 return 0;
646 }
647
648 static int server_restore_streams(Server *s, FDSet *fds) {
649 _cleanup_closedir_ DIR *d = NULL;
650 struct dirent *de;
651 int r;
652
653 d = opendir("/run/systemd/journal/streams");
654 if (!d) {
655 if (errno == ENOENT)
656 return 0;
657
658 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
659 }
660
661 FOREACH_DIRENT(de, d, goto fail) {
662 unsigned long st_dev, st_ino;
663 bool found = false;
664 Iterator i;
665 int fd;
666
667 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
668 continue;
669
670 FDSET_FOREACH(fd, fds, i) {
671 struct stat st;
672
673 if (fstat(fd, &st) < 0)
674 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
675
676 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
677 found = true;
678 break;
679 }
680 }
681
682 if (!found) {
683 /* No file descriptor? Then let's delete the state file */
684 log_debug("Cannot restore stream file %s", de->d_name);
685 unlinkat(dirfd(d), de->d_name, 0);
686 continue;
687 }
688
689 fdset_remove(fds, fd);
690
691 r = stdout_stream_restore(s, de->d_name, fd);
692 if (r < 0)
693 safe_close(fd);
694 }
695
696 return 0;
697
698 fail:
699 return log_error_errno(errno, "Failed to read streams directory: %m");
700 }
701
702 int server_open_stdout_socket(Server *s, FDSet *fds) {
703 int r;
704
705 assert(s);
706
707 if (s->stdout_fd < 0) {
708 union sockaddr_union sa = {
709 .un.sun_family = AF_UNIX,
710 .un.sun_path = "/run/systemd/journal/stdout",
711 };
712
713 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
714 if (s->stdout_fd < 0)
715 return log_error_errno(errno, "socket() failed: %m");
716
717 unlink(sa.un.sun_path);
718
719 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
720 if (r < 0)
721 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
722
723 (void) chmod(sa.un.sun_path, 0666);
724
725 if (listen(s->stdout_fd, SOMAXCONN) < 0)
726 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
727 } else
728 fd_nonblock(s->stdout_fd, 1);
729
730 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
731 if (r < 0)
732 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
733
734 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
735 if (r < 0)
736 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
737
738 /* Try to restore streams, but don't bother if this fails */
739 (void) server_restore_streams(s, fds);
740
741 return 0;
742 }