]>
git.proxmox.com Git - systemd.git/blob - src/journal-remote/journal-remote-parse.c
2 This file is part of systemd.
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 #include "alloc-util.h"
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24 #include "parse-util.h"
25 #include "string-util.h"
27 #define LINE_CHUNK 8*1024u
29 void source_free(RemoteSource
*source
) {
33 if (source
->fd
>= 0 && !source
->passive_fd
) {
34 log_debug("Closing fd:%d (%s)", source
->fd
, source
->name
);
35 safe_close(source
->fd
);
40 iovw_free_contents(&source
->iovw
);
42 log_debug("Writer ref count %i", source
->writer
->n_ref
);
43 writer_unref(source
->writer
);
45 sd_event_source_unref(source
->event
);
46 sd_event_source_unref(source
->buffer_event
);
52 * Initialize zero-filled source with given values. On success, takes
53 * ownerhship of fd and writer, otherwise does not touch them.
55 RemoteSource
* source_new(int fd
, bool passive_fd
, char *name
, Writer
*writer
) {
59 log_debug("Creating source for %sfd:%d (%s)",
60 passive_fd
? "passive " : "", fd
, name
);
64 source
= new0(RemoteSource
, 1);
69 source
->passive_fd
= passive_fd
;
71 source
->writer
= writer
;
76 static char* realloc_buffer(RemoteSource
*source
, size_t size
) {
77 char *b
, *old
= source
->buf
;
79 b
= GREEDY_REALLOC(source
->buf
, source
->size
, size
);
83 iovw_rebase(&source
->iovw
, old
, source
->buf
);
88 static int get_line(RemoteSource
*source
, char **line
, size_t *size
) {
93 assert(source
->state
== STATE_LINE
);
94 assert(source
->offset
<= source
->filled
);
95 assert(source
->filled
<= source
->size
);
96 assert(source
->buf
== NULL
|| source
->size
> 0);
97 assert(source
->fd
>= 0);
101 size_t start
= MAX(source
->scanned
, source
->offset
);
103 c
= memchr(source
->buf
+ start
, '\n',
104 source
->filled
- start
);
109 source
->scanned
= source
->filled
;
110 if (source
->scanned
>= DATA_SIZE_MAX
) {
111 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
115 if (source
->passive_fd
)
116 /* we have to wait for some data to come to us */
119 /* We know that source->filled is at most DATA_SIZE_MAX, so if
120 we reallocate it, we'll increase the size at least a bit. */
121 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
122 if (source
->size
- source
->filled
< LINE_CHUNK
&&
123 !realloc_buffer(source
, MIN(source
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
127 assert(source
->size
- source
->filled
>= LINE_CHUNK
||
128 source
->size
== ENTRY_SIZE_MAX
);
131 source
->buf
+ source
->filled
,
132 source
->size
- source
->filled
);
135 log_error_errno(errno
, "read(%d, ..., %zu): %m",
137 source
->size
- source
->filled
);
145 *line
= source
->buf
+ source
->offset
;
146 *size
= c
+ 1 - source
->buf
- source
->offset
;
147 source
->offset
+= *size
;
152 int push_data(RemoteSource
*source
, const char *data
, size_t size
) {
154 assert(source
->state
!= STATE_EOF
);
156 if (!realloc_buffer(source
, source
->filled
+ size
)) {
157 log_error("Failed to store received data of size %zu "
158 "(in addition to existing %zu bytes with %zu filled): %s",
159 size
, source
->size
, source
->filled
, strerror(ENOMEM
));
163 memcpy(source
->buf
+ source
->filled
, data
, size
);
164 source
->filled
+= size
;
169 static int fill_fixed_size(RemoteSource
*source
, void **data
, size_t size
) {
172 assert(source
->state
== STATE_DATA_START
||
173 source
->state
== STATE_DATA
||
174 source
->state
== STATE_DATA_FINISH
);
175 assert(size
<= DATA_SIZE_MAX
);
176 assert(source
->offset
<= source
->filled
);
177 assert(source
->filled
<= source
->size
);
178 assert(source
->buf
!= NULL
|| source
->size
== 0);
179 assert(source
->buf
== NULL
|| source
->size
> 0);
180 assert(source
->fd
>= 0);
183 while (source
->filled
- source
->offset
< size
) {
186 if (source
->passive_fd
)
187 /* we have to wait for some data to come to us */
190 if (!realloc_buffer(source
, source
->offset
+ size
))
193 n
= read(source
->fd
, source
->buf
+ source
->filled
,
194 source
->size
- source
->filled
);
197 log_error_errno(errno
, "read(%d, ..., %zu): %m", source
->fd
,
198 source
->size
- source
->filled
);
206 *data
= source
->buf
+ source
->offset
;
207 source
->offset
+= size
;
212 static int get_data_size(RemoteSource
*source
) {
217 assert(source
->state
== STATE_DATA_START
);
218 assert(source
->data_size
== 0);
220 r
= fill_fixed_size(source
, &data
, sizeof(uint64_t));
224 source
->data_size
= le64toh( *(uint64_t *) data
);
225 if (source
->data_size
> DATA_SIZE_MAX
) {
226 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
227 source
->data_size
, DATA_SIZE_MAX
);
230 if (source
->data_size
== 0)
231 log_warning("Binary field with zero length");
236 static int get_data_data(RemoteSource
*source
, void **data
) {
241 assert(source
->state
== STATE_DATA
);
243 r
= fill_fixed_size(source
, data
, source
->data_size
);
250 static int get_data_newline(RemoteSource
*source
) {
255 assert(source
->state
== STATE_DATA_FINISH
);
257 r
= fill_fixed_size(source
, (void**) &data
, 1);
263 log_error("expected newline, got '%c'", *data
);
270 static int process_dunder(RemoteSource
*source
, char *line
, size_t n
) {
271 const char *timestamp
;
276 assert(line
[n
-1] == '\n');
278 /* XXX: is it worth to support timestamps in extended format?
279 * We don't produce them, but who knows... */
281 timestamp
= startswith(line
, "__CURSOR=");
283 /* ignore __CURSOR */
286 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
288 long long unsigned x
;
290 r
= safe_atollu(timestamp
, &x
);
292 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
294 source
->ts
.realtime
= x
;
295 return r
< 0 ? r
: 1;
298 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
300 long long unsigned x
;
302 r
= safe_atollu(timestamp
, &x
);
304 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
306 source
->ts
.monotonic
= x
;
307 return r
< 0 ? r
: 1;
310 timestamp
= startswith(line
, "__");
312 log_notice("Unknown dunder line %s", line
);
320 static int process_data(RemoteSource
*source
) {
323 switch(source
->state
) {
328 assert(source
->data_size
== 0);
330 r
= get_line(source
, &line
, &n
);
334 source
->state
= STATE_EOF
;
338 assert(line
[n
-1] == '\n');
341 log_trace("Received empty line, event is ready");
345 r
= process_dunder(source
, line
, n
);
347 return r
< 0 ? r
: 0;
352 LLLLLLLL0011223344...\n
354 sep
= memchr(line
, '=', n
);
359 r
= iovw_put(&source
->iovw
, line
, n
);
363 /* replace \n with = */
366 source
->field_len
= n
;
367 source
->state
= STATE_DATA_START
;
369 /* we cannot put the field in iovec until we have all data */
372 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
374 return 0; /* continue */
377 case STATE_DATA_START
:
378 assert(source
->data_size
== 0);
380 r
= get_data_size(source
);
381 // log_debug("get_data_size() -> %d", r);
385 source
->state
= STATE_EOF
;
389 source
->state
= source
->data_size
> 0 ?
390 STATE_DATA
: STATE_DATA_FINISH
;
392 return 0; /* continue */
398 assert(source
->data_size
> 0);
400 r
= get_data_data(source
, &data
);
401 // log_debug("get_data_data() -> %d", r);
405 source
->state
= STATE_EOF
;
411 field
= (char*) data
- sizeof(uint64_t) - source
->field_len
;
412 memmove(field
+ sizeof(uint64_t), field
, source
->field_len
);
414 r
= iovw_put(&source
->iovw
, field
+ sizeof(uint64_t), source
->field_len
+ source
->data_size
);
418 source
->state
= STATE_DATA_FINISH
;
420 return 0; /* continue */
423 case STATE_DATA_FINISH
:
424 r
= get_data_newline(source
);
425 // log_debug("get_data_newline() -> %d", r);
429 source
->state
= STATE_EOF
;
433 source
->data_size
= 0;
434 source
->state
= STATE_LINE
;
436 return 0; /* continue */
438 assert_not_reached("wtf?");
442 int process_source(RemoteSource
*source
, bool compress
, bool seal
) {
443 size_t remain
, target
;
447 assert(source
->writer
);
449 r
= process_data(source
);
453 /* We have a full event */
454 log_trace("Received full event from source@%p fd:%d (%s)",
455 source
, source
->fd
, source
->name
);
457 if (!source
->iovw
.count
) {
458 log_warning("Entry with no payload, skipping");
462 assert(source
->iovw
.iovec
);
463 assert(source
->iovw
.count
);
465 r
= writer_write(source
->writer
, &source
->iovw
, &source
->ts
, compress
, seal
);
467 log_error_errno(r
, "Failed to write entry of %zu bytes: %m",
468 iovw_size(&source
->iovw
));
473 iovw_free_contents(&source
->iovw
);
475 /* possibly reset buffer position */
476 remain
= source
->filled
- source
->offset
;
478 if (remain
== 0) /* no brainer */
479 source
->offset
= source
->scanned
= source
->filled
= 0;
480 else if (source
->offset
> source
->size
- source
->filled
&&
481 source
->offset
> remain
) {
482 memcpy(source
->buf
, source
->buf
+ source
->offset
, remain
);
483 source
->offset
= source
->scanned
= 0;
484 source
->filled
= remain
;
487 target
= source
->size
;
488 while (target
> 16 * LINE_CHUNK
&& remain
< target
/ 2)
490 if (target
< source
->size
) {
493 tmp
= realloc(source
->buf
, target
);
495 log_warning("Failed to reallocate buffer to (smaller) size %zu",
498 log_debug("Reallocated buffer from %zu to %zu bytes",
499 source
->size
, target
);
501 source
->size
= target
;