]> git.proxmox.com Git - systemd.git/blame - src/journal-remote/journal-remote-parse.c
Imported Upstream version 217
[systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
60f067b4
JS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include "journal-remote-parse.h"
23#include "journald-native.h"
24
5eef597e 25#define LINE_CHUNK 8*1024u
60f067b4
JS
26
27void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
5eef597e 31 if (source->fd >= 0 && !source->passive_fd) {
60f067b4 32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
5eef597e 33 safe_close(source->fd);
60f067b4 34 }
5eef597e 35
60f067b4
JS
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
5eef597e
MP
39
40 log_debug("Writer ref count %u", source->writer->n_ref);
41 writer_unref(source->writer);
42
43 sd_event_source_unref(source->event);
44
60f067b4
JS
45 free(source);
46}
47
5eef597e
MP
48/**
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
51 */
52RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54 RemoteSource *source;
55
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
58
59 assert(fd >= 0);
60
61 source = new0(RemoteSource, 1);
62 if (!source)
63 return NULL;
64
65 source->fd = fd;
66 source->passive_fd = passive_fd;
67 source->name = name;
68 source->writer = writer;
69
70 return source;
71}
72
73static char* realloc_buffer(RemoteSource *source, size_t size) {
74 char *b, *old = source->buf;
75
76 b = GREEDY_REALLOC(source->buf, source->size, size);
77 if (!b)
78 return NULL;
79
80 iovw_rebase(&source->iovw, old, source->buf);
81
82 return b;
83}
84
60f067b4 85static int get_line(RemoteSource *source, char **line, size_t *size) {
5eef597e 86 ssize_t n;
60f067b4 87 char *c = NULL;
60f067b4
JS
88
89 assert(source);
90 assert(source->state == STATE_LINE);
5eef597e 91 assert(source->offset <= source->filled);
60f067b4
JS
92 assert(source->filled <= source->size);
93 assert(source->buf == NULL || source->size > 0);
5eef597e 94 assert(source->fd >= 0);
60f067b4 95
5eef597e
MP
96 while (true) {
97 if (source->buf) {
98 size_t start = MAX(source->scanned, source->offset);
60f067b4 99
5eef597e
MP
100 c = memchr(source->buf + start, '\n',
101 source->filled - start);
102 if (c != NULL)
103 break;
104 }
60f067b4 105
5eef597e
MP
106 source->scanned = source->filled;
107 if (source->scanned >= DATA_SIZE_MAX) {
108 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
109 return -E2BIG;
110 }
60f067b4 111
5eef597e
MP
112 if (source->passive_fd)
113 /* we have to wait for some data to come to us */
114 return -EWOULDBLOCK;
60f067b4 115
5eef597e
MP
116 if (source->size - source->filled < LINE_CHUNK &&
117 !realloc_buffer(source,
118 MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
119 return log_oom();
120
121 assert(source->size - source->filled >= LINE_CHUNK ||
122 source->size == ENTRY_SIZE_MAX);
123
124 n = read(source->fd, source->buf + source->filled,
125 source->size - source->filled);
126 if (n < 0) {
127 if (errno != EAGAIN && errno != EWOULDBLOCK)
128 log_error("read(%d, ..., %zd): %m", source->fd,
129 source->size - source->filled);
130 return -errno;
131 } else if (n == 0)
132 return 0;
133
134 source->filled += n;
60f067b4 135 }
5eef597e
MP
136
137 *line = source->buf + source->offset;
138 *size = c + 1 - source->buf - source->offset;
139 source->offset += *size;
60f067b4
JS
140
141 return 1;
142}
143
144int push_data(RemoteSource *source, const char *data, size_t size) {
145 assert(source);
146 assert(source->state != STATE_EOF);
147
5eef597e
MP
148 if (!realloc_buffer(source, source->filled + size)) {
149 log_error("Failed to store received data of size %zu "
150 "(in addition to existing %zu bytes with %zu filled): %s",
151 size, source->size, source->filled, strerror(ENOMEM));
152 return -ENOMEM;
153 }
60f067b4
JS
154
155 memcpy(source->buf + source->filled, data, size);
156 source->filled += size;
157
158 return 0;
159}
160
161static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
60f067b4
JS
162
163 assert(source);
164 assert(source->state == STATE_DATA_START ||
165 source->state == STATE_DATA ||
166 source->state == STATE_DATA_FINISH);
167 assert(size <= DATA_SIZE_MAX);
5eef597e 168 assert(source->offset <= source->filled);
60f067b4
JS
169 assert(source->filled <= source->size);
170 assert(source->buf != NULL || source->size == 0);
171 assert(source->buf == NULL || source->size > 0);
5eef597e 172 assert(source->fd >= 0);
60f067b4
JS
173 assert(data);
174
5eef597e
MP
175 while (source->filled - source->offset < size) {
176 int n;
177
178 if (source->passive_fd)
60f067b4
JS
179 /* we have to wait for some data to come to us */
180 return -EWOULDBLOCK;
181
5eef597e 182 if (!realloc_buffer(source, source->offset + size))
60f067b4
JS
183 return log_oom();
184
185 n = read(source->fd, source->buf + source->filled,
186 source->size - source->filled);
187 if (n < 0) {
188 if (errno != EAGAIN && errno != EWOULDBLOCK)
189 log_error("read(%d, ..., %zd): %m", source->fd,
190 source->size - source->filled);
191 return -errno;
192 } else if (n == 0)
193 return 0;
194
195 source->filled += n;
196 }
197
5eef597e
MP
198 *data = source->buf + source->offset;
199 source->offset += size;
60f067b4
JS
200
201 return 1;
202}
203
204static int get_data_size(RemoteSource *source) {
205 int r;
5eef597e 206 void *data;
60f067b4
JS
207
208 assert(source);
209 assert(source->state == STATE_DATA_START);
210 assert(source->data_size == 0);
211
212 r = fill_fixed_size(source, &data, sizeof(uint64_t));
213 if (r <= 0)
214 return r;
215
216 source->data_size = le64toh( *(uint64_t *) data );
217 if (source->data_size > DATA_SIZE_MAX) {
5eef597e 218 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
60f067b4
JS
219 source->data_size, DATA_SIZE_MAX);
220 return -EINVAL;
221 }
222 if (source->data_size == 0)
223 log_warning("Binary field with zero length");
224
225 return 1;
226}
227
228static int get_data_data(RemoteSource *source, void **data) {
229 int r;
230
231 assert(source);
232 assert(data);
233 assert(source->state == STATE_DATA);
234
235 r = fill_fixed_size(source, data, source->data_size);
236 if (r <= 0)
237 return r;
238
239 return 1;
240}
241
242static int get_data_newline(RemoteSource *source) {
243 int r;
5eef597e 244 char *data;
60f067b4
JS
245
246 assert(source);
247 assert(source->state == STATE_DATA_FINISH);
248
249 r = fill_fixed_size(source, (void**) &data, 1);
250 if (r <= 0)
251 return r;
252
253 assert(data);
254 if (*data != '\n') {
255 log_error("expected newline, got '%c'", *data);
256 return -EINVAL;
257 }
258
259 return 1;
260}
261
262static int process_dunder(RemoteSource *source, char *line, size_t n) {
263 const char *timestamp;
264 int r;
265
266 assert(line);
267 assert(n > 0);
268 assert(line[n-1] == '\n');
269
270 /* XXX: is it worth to support timestamps in extended format?
271 * We don't produce them, but who knows... */
272
273 timestamp = startswith(line, "__CURSOR=");
274 if (timestamp)
275 /* ignore __CURSOR */
276 return 1;
277
278 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
279 if (timestamp) {
280 long long unsigned x;
281 line[n-1] = '\0';
282 r = safe_atollu(timestamp, &x);
283 if (r < 0)
284 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
285 else
286 source->ts.realtime = x;
287 return r < 0 ? r : 1;
288 }
289
290 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
291 if (timestamp) {
292 long long unsigned x;
293 line[n-1] = '\0';
294 r = safe_atollu(timestamp, &x);
295 if (r < 0)
296 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
297 else
298 source->ts.monotonic = x;
299 return r < 0 ? r : 1;
300 }
301
302 timestamp = startswith(line, "__");
303 if (timestamp) {
304 log_notice("Unknown dunder line %s", line);
305 return 1;
306 }
307
308 /* no dunder */
309 return 0;
310}
311
312int process_data(RemoteSource *source) {
313 int r;
314
315 switch(source->state) {
316 case STATE_LINE: {
317 char *line, *sep;
318 size_t n;
319
320 assert(source->data_size == 0);
321
322 r = get_line(source, &line, &n);
323 if (r < 0)
324 return r;
325 if (r == 0) {
326 source->state = STATE_EOF;
327 return r;
328 }
329 assert(n > 0);
330 assert(line[n-1] == '\n');
331
332 if (n == 1) {
5eef597e 333 log_trace("Received empty line, event is ready");
60f067b4
JS
334 return 1;
335 }
336
337 r = process_dunder(source, line, n);
5eef597e 338 if (r != 0)
60f067b4 339 return r < 0 ? r : 0;
60f067b4
JS
340
341 /* MESSAGE=xxx\n
342 or
343 COREDUMP\n
344 LLLLLLLL0011223344...\n
345 */
346 sep = memchr(line, '=', n);
347 if (sep)
348 /* chomp newline */
349 n--;
350 else
351 /* replace \n with = */
352 line[n-1] = '=';
5eef597e 353 log_trace("Received: %.*s", (int) n, line);
60f067b4
JS
354
355 r = iovw_put(&source->iovw, line, n);
356 if (r < 0) {
357 log_error("Failed to put line in iovect");
60f067b4
JS
358 return r;
359 }
360
361 if (!sep)
362 source->state = STATE_DATA_START;
363 return 0; /* continue */
364 }
365
366 case STATE_DATA_START:
367 assert(source->data_size == 0);
368
369 r = get_data_size(source);
5eef597e 370 // log_debug("get_data_size() -> %d", r);
60f067b4
JS
371 if (r < 0)
372 return r;
373 if (r == 0) {
374 source->state = STATE_EOF;
375 return 0;
376 }
377
378 source->state = source->data_size > 0 ?
379 STATE_DATA : STATE_DATA_FINISH;
380
381 return 0; /* continue */
382
383 case STATE_DATA: {
384 void *data;
385
386 assert(source->data_size > 0);
387
388 r = get_data_data(source, &data);
5eef597e 389 // log_debug("get_data_data() -> %d", r);
60f067b4
JS
390 if (r < 0)
391 return r;
392 if (r == 0) {
393 source->state = STATE_EOF;
394 return 0;
395 }
396
397 assert(data);
398
399 r = iovw_put(&source->iovw, data, source->data_size);
400 if (r < 0) {
401 log_error("failed to put binary buffer in iovect");
402 return r;
403 }
404
405 source->state = STATE_DATA_FINISH;
406
407 return 0; /* continue */
408 }
409
410 case STATE_DATA_FINISH:
411 r = get_data_newline(source);
5eef597e 412 // log_debug("get_data_newline() -> %d", r);
60f067b4
JS
413 if (r < 0)
414 return r;
415 if (r == 0) {
416 source->state = STATE_EOF;
417 return 0;
418 }
419
420 source->data_size = 0;
421 source->state = STATE_LINE;
422
423 return 0; /* continue */
424 default:
425 assert_not_reached("wtf?");
426 }
427}
428
5eef597e
MP
429int process_source(RemoteSource *source, bool compress, bool seal) {
430 size_t remain, target;
60f067b4
JS
431 int r;
432
433 assert(source);
5eef597e 434 assert(source->writer);
60f067b4
JS
435
436 r = process_data(source);
437 if (r <= 0)
438 return r;
439
440 /* We have a full event */
5eef597e
MP
441 log_trace("Received a full event from source@%p fd:%d (%s)",
442 source, source->fd, source->name);
60f067b4
JS
443
444 if (!source->iovw.count) {
445 log_warning("Entry with no payload, skipping");
446 goto freeing;
447 }
448
449 assert(source->iovw.iovec);
450 assert(source->iovw.count);
451
5eef597e 452 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
60f067b4
JS
453 if (r < 0)
454 log_error("Failed to write entry of %zu bytes: %s",
455 iovw_size(&source->iovw), strerror(-r));
456 else
457 r = 1;
458
459 freeing:
460 iovw_free_contents(&source->iovw);
5eef597e
MP
461
462 /* possibly reset buffer position */
463 remain = source->filled - source->offset;
464
465 if (remain == 0) /* no brainer */
466 source->offset = source->scanned = source->filled = 0;
467 else if (source->offset > source->size - source->filled &&
468 source->offset > remain) {
469 memcpy(source->buf, source->buf + source->offset, remain);
470 source->offset = source->scanned = 0;
471 source->filled = remain;
472 }
473
474 target = source->size;
475 while (target > 16 * LINE_CHUNK && remain < target / 2)
476 target /= 2;
477 if (target < source->size) {
478 char *tmp;
479
480 tmp = realloc(source->buf, target);
481 if (!tmp)
482 log_warning("Failed to reallocate buffer to (smaller) size %zu",
483 target);
484 else {
485 log_debug("Reallocated buffer from %zu to %zu bytes",
486 source->size, target);
487 source->buf = tmp;
488 source->size = target;
489 }
490 }
491
60f067b4
JS
492 return r;
493}