]> git.proxmox.com Git - systemd.git/blame - src/journal-remote/journal-remote-parse.c
Imported Upstream version 220
[systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
60f067b4
JS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include "journal-remote-parse.h"
23#include "journald-native.h"
24
5eef597e 25#define LINE_CHUNK 8*1024u
60f067b4
JS
26
27void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
5eef597e 31 if (source->fd >= 0 && !source->passive_fd) {
60f067b4 32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
5eef597e 33 safe_close(source->fd);
60f067b4 34 }
5eef597e 35
60f067b4
JS
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
5eef597e 39
e735f4d4 40 log_debug("Writer ref count %i", source->writer->n_ref);
5eef597e
MP
41 writer_unref(source->writer);
42
43 sd_event_source_unref(source->event);
e3bff60a 44 sd_event_source_unref(source->buffer_event);
5eef597e 45
60f067b4
JS
46 free(source);
47}
48
5eef597e
MP
49/**
50 * Initialize zero-filled source with given values. On success, takes
51 * ownerhship of fd and writer, otherwise does not touch them.
52 */
53RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
54
55 RemoteSource *source;
56
57 log_debug("Creating source for %sfd:%d (%s)",
58 passive_fd ? "passive " : "", fd, name);
59
60 assert(fd >= 0);
61
62 source = new0(RemoteSource, 1);
63 if (!source)
64 return NULL;
65
66 source->fd = fd;
67 source->passive_fd = passive_fd;
68 source->name = name;
69 source->writer = writer;
70
71 return source;
72}
73
74static char* realloc_buffer(RemoteSource *source, size_t size) {
75 char *b, *old = source->buf;
76
77 b = GREEDY_REALLOC(source->buf, source->size, size);
78 if (!b)
79 return NULL;
80
81 iovw_rebase(&source->iovw, old, source->buf);
82
83 return b;
84}
85
60f067b4 86static int get_line(RemoteSource *source, char **line, size_t *size) {
5eef597e 87 ssize_t n;
60f067b4 88 char *c = NULL;
60f067b4
JS
89
90 assert(source);
91 assert(source->state == STATE_LINE);
5eef597e 92 assert(source->offset <= source->filled);
60f067b4
JS
93 assert(source->filled <= source->size);
94 assert(source->buf == NULL || source->size > 0);
5eef597e 95 assert(source->fd >= 0);
60f067b4 96
5eef597e
MP
97 while (true) {
98 if (source->buf) {
99 size_t start = MAX(source->scanned, source->offset);
60f067b4 100
5eef597e
MP
101 c = memchr(source->buf + start, '\n',
102 source->filled - start);
103 if (c != NULL)
104 break;
105 }
60f067b4 106
5eef597e
MP
107 source->scanned = source->filled;
108 if (source->scanned >= DATA_SIZE_MAX) {
109 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
110 return -E2BIG;
111 }
60f067b4 112
5eef597e
MP
113 if (source->passive_fd)
114 /* we have to wait for some data to come to us */
e3bff60a 115 return -EAGAIN;
60f067b4 116
e3bff60a
MP
117 /* We know that source->filled is at most DATA_SIZE_MAX, so if
118 we reallocate it, we'll increase the size at least a bit. */
119 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
5eef597e 120 if (source->size - source->filled < LINE_CHUNK &&
e3bff60a 121 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
5eef597e
MP
122 return log_oom();
123
e3bff60a 124 assert(source->buf);
5eef597e
MP
125 assert(source->size - source->filled >= LINE_CHUNK ||
126 source->size == ENTRY_SIZE_MAX);
127
e3bff60a
MP
128 n = read(source->fd,
129 source->buf + source->filled,
5eef597e
MP
130 source->size - source->filled);
131 if (n < 0) {
e3bff60a
MP
132 if (errno != EAGAIN)
133 log_error_errno(errno, "read(%d, ..., %zu): %m",
134 source->fd,
e735f4d4 135 source->size - source->filled);
5eef597e
MP
136 return -errno;
137 } else if (n == 0)
138 return 0;
139
140 source->filled += n;
60f067b4 141 }
5eef597e
MP
142
143 *line = source->buf + source->offset;
144 *size = c + 1 - source->buf - source->offset;
145 source->offset += *size;
60f067b4
JS
146
147 return 1;
148}
149
150int push_data(RemoteSource *source, const char *data, size_t size) {
151 assert(source);
152 assert(source->state != STATE_EOF);
153
5eef597e
MP
154 if (!realloc_buffer(source, source->filled + size)) {
155 log_error("Failed to store received data of size %zu "
156 "(in addition to existing %zu bytes with %zu filled): %s",
157 size, source->size, source->filled, strerror(ENOMEM));
158 return -ENOMEM;
159 }
60f067b4
JS
160
161 memcpy(source->buf + source->filled, data, size);
162 source->filled += size;
163
164 return 0;
165}
166
167static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
60f067b4
JS
168
169 assert(source);
170 assert(source->state == STATE_DATA_START ||
171 source->state == STATE_DATA ||
172 source->state == STATE_DATA_FINISH);
173 assert(size <= DATA_SIZE_MAX);
5eef597e 174 assert(source->offset <= source->filled);
60f067b4
JS
175 assert(source->filled <= source->size);
176 assert(source->buf != NULL || source->size == 0);
177 assert(source->buf == NULL || source->size > 0);
5eef597e 178 assert(source->fd >= 0);
60f067b4
JS
179 assert(data);
180
5eef597e
MP
181 while (source->filled - source->offset < size) {
182 int n;
183
184 if (source->passive_fd)
60f067b4 185 /* we have to wait for some data to come to us */
e3bff60a 186 return -EAGAIN;
60f067b4 187
5eef597e 188 if (!realloc_buffer(source, source->offset + size))
60f067b4
JS
189 return log_oom();
190
191 n = read(source->fd, source->buf + source->filled,
192 source->size - source->filled);
193 if (n < 0) {
e3bff60a 194 if (errno != EAGAIN)
e735f4d4
MP
195 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
196 source->size - source->filled);
60f067b4
JS
197 return -errno;
198 } else if (n == 0)
199 return 0;
200
201 source->filled += n;
202 }
203
5eef597e
MP
204 *data = source->buf + source->offset;
205 source->offset += size;
60f067b4
JS
206
207 return 1;
208}
209
210static int get_data_size(RemoteSource *source) {
211 int r;
5eef597e 212 void *data;
60f067b4
JS
213
214 assert(source);
215 assert(source->state == STATE_DATA_START);
216 assert(source->data_size == 0);
217
218 r = fill_fixed_size(source, &data, sizeof(uint64_t));
219 if (r <= 0)
220 return r;
221
222 source->data_size = le64toh( *(uint64_t *) data );
223 if (source->data_size > DATA_SIZE_MAX) {
5eef597e 224 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
60f067b4
JS
225 source->data_size, DATA_SIZE_MAX);
226 return -EINVAL;
227 }
228 if (source->data_size == 0)
229 log_warning("Binary field with zero length");
230
231 return 1;
232}
233
234static int get_data_data(RemoteSource *source, void **data) {
235 int r;
236
237 assert(source);
238 assert(data);
239 assert(source->state == STATE_DATA);
240
241 r = fill_fixed_size(source, data, source->data_size);
242 if (r <= 0)
243 return r;
244
245 return 1;
246}
247
248static int get_data_newline(RemoteSource *source) {
249 int r;
5eef597e 250 char *data;
60f067b4
JS
251
252 assert(source);
253 assert(source->state == STATE_DATA_FINISH);
254
255 r = fill_fixed_size(source, (void**) &data, 1);
256 if (r <= 0)
257 return r;
258
259 assert(data);
260 if (*data != '\n') {
261 log_error("expected newline, got '%c'", *data);
262 return -EINVAL;
263 }
264
265 return 1;
266}
267
268static int process_dunder(RemoteSource *source, char *line, size_t n) {
269 const char *timestamp;
270 int r;
271
272 assert(line);
273 assert(n > 0);
274 assert(line[n-1] == '\n');
275
276 /* XXX: is it worth to support timestamps in extended format?
277 * We don't produce them, but who knows... */
278
279 timestamp = startswith(line, "__CURSOR=");
280 if (timestamp)
281 /* ignore __CURSOR */
282 return 1;
283
284 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
285 if (timestamp) {
286 long long unsigned x;
287 line[n-1] = '\0';
288 r = safe_atollu(timestamp, &x);
289 if (r < 0)
290 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
291 else
292 source->ts.realtime = x;
293 return r < 0 ? r : 1;
294 }
295
296 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
297 if (timestamp) {
298 long long unsigned x;
299 line[n-1] = '\0';
300 r = safe_atollu(timestamp, &x);
301 if (r < 0)
302 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
303 else
304 source->ts.monotonic = x;
305 return r < 0 ? r : 1;
306 }
307
308 timestamp = startswith(line, "__");
309 if (timestamp) {
310 log_notice("Unknown dunder line %s", line);
311 return 1;
312 }
313
314 /* no dunder */
315 return 0;
316}
317
e3bff60a 318static int process_data(RemoteSource *source) {
60f067b4
JS
319 int r;
320
321 switch(source->state) {
322 case STATE_LINE: {
323 char *line, *sep;
e3bff60a 324 size_t n = 0;
60f067b4
JS
325
326 assert(source->data_size == 0);
327
328 r = get_line(source, &line, &n);
329 if (r < 0)
330 return r;
331 if (r == 0) {
332 source->state = STATE_EOF;
333 return r;
334 }
335 assert(n > 0);
336 assert(line[n-1] == '\n');
337
338 if (n == 1) {
5eef597e 339 log_trace("Received empty line, event is ready");
60f067b4
JS
340 return 1;
341 }
342
343 r = process_dunder(source, line, n);
5eef597e 344 if (r != 0)
60f067b4 345 return r < 0 ? r : 0;
60f067b4
JS
346
347 /* MESSAGE=xxx\n
348 or
349 COREDUMP\n
350 LLLLLLLL0011223344...\n
351 */
352 sep = memchr(line, '=', n);
e3bff60a 353 if (sep) {
60f067b4
JS
354 /* chomp newline */
355 n--;
e3bff60a
MP
356
357 r = iovw_put(&source->iovw, line, n);
358 if (r < 0)
359 return r;
360 } else {
60f067b4
JS
361 /* replace \n with = */
362 line[n-1] = '=';
60f067b4 363
e3bff60a
MP
364 source->field_len = n;
365 source->state = STATE_DATA_START;
366
367 /* we cannot put the field in iovec until we have all data */
60f067b4
JS
368 }
369
e3bff60a
MP
370 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
371
60f067b4
JS
372 return 0; /* continue */
373 }
374
375 case STATE_DATA_START:
376 assert(source->data_size == 0);
377
378 r = get_data_size(source);
5eef597e 379 // log_debug("get_data_size() -> %d", r);
60f067b4
JS
380 if (r < 0)
381 return r;
382 if (r == 0) {
383 source->state = STATE_EOF;
384 return 0;
385 }
386
387 source->state = source->data_size > 0 ?
388 STATE_DATA : STATE_DATA_FINISH;
389
390 return 0; /* continue */
391
392 case STATE_DATA: {
393 void *data;
e3bff60a 394 char *field;
60f067b4
JS
395
396 assert(source->data_size > 0);
397
398 r = get_data_data(source, &data);
5eef597e 399 // log_debug("get_data_data() -> %d", r);
60f067b4
JS
400 if (r < 0)
401 return r;
402 if (r == 0) {
403 source->state = STATE_EOF;
404 return 0;
405 }
406
407 assert(data);
408
e3bff60a
MP
409 field = (char*) data - sizeof(uint64_t) - source->field_len;
410 memmove(field + sizeof(uint64_t), field, source->field_len);
411
412 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
413 if (r < 0)
60f067b4 414 return r;
60f067b4
JS
415
416 source->state = STATE_DATA_FINISH;
417
418 return 0; /* continue */
419 }
420
421 case STATE_DATA_FINISH:
422 r = get_data_newline(source);
5eef597e 423 // log_debug("get_data_newline() -> %d", r);
60f067b4
JS
424 if (r < 0)
425 return r;
426 if (r == 0) {
427 source->state = STATE_EOF;
428 return 0;
429 }
430
431 source->data_size = 0;
432 source->state = STATE_LINE;
433
434 return 0; /* continue */
435 default:
436 assert_not_reached("wtf?");
437 }
438}
439
5eef597e
MP
440int process_source(RemoteSource *source, bool compress, bool seal) {
441 size_t remain, target;
60f067b4
JS
442 int r;
443
444 assert(source);
5eef597e 445 assert(source->writer);
60f067b4
JS
446
447 r = process_data(source);
448 if (r <= 0)
449 return r;
450
451 /* We have a full event */
e3bff60a 452 log_trace("Received full event from source@%p fd:%d (%s)",
5eef597e 453 source, source->fd, source->name);
60f067b4
JS
454
455 if (!source->iovw.count) {
456 log_warning("Entry with no payload, skipping");
457 goto freeing;
458 }
459
460 assert(source->iovw.iovec);
461 assert(source->iovw.count);
462
5eef597e 463 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
60f067b4 464 if (r < 0)
f47781d8
MP
465 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
466 iovw_size(&source->iovw));
60f067b4
JS
467 else
468 r = 1;
469
470 freeing:
471 iovw_free_contents(&source->iovw);
5eef597e
MP
472
473 /* possibly reset buffer position */
474 remain = source->filled - source->offset;
475
476 if (remain == 0) /* no brainer */
477 source->offset = source->scanned = source->filled = 0;
478 else if (source->offset > source->size - source->filled &&
479 source->offset > remain) {
480 memcpy(source->buf, source->buf + source->offset, remain);
481 source->offset = source->scanned = 0;
482 source->filled = remain;
483 }
484
485 target = source->size;
486 while (target > 16 * LINE_CHUNK && remain < target / 2)
487 target /= 2;
488 if (target < source->size) {
489 char *tmp;
490
491 tmp = realloc(source->buf, target);
492 if (!tmp)
493 log_warning("Failed to reallocate buffer to (smaller) size %zu",
494 target);
495 else {
496 log_debug("Reallocated buffer from %zu to %zu bytes",
497 source->size, target);
498 source->buf = tmp;
499 source->size = target;
500 }
501 }
502
60f067b4
JS
503 return r;
504}