]>
Commit | Line | Data |
---|---|---|
60f067b4 JS |
1 | /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ |
2 | ||
3 | /*** | |
4 | This file is part of systemd. | |
5 | ||
6 | Copyright 2014 Zbigniew Jędrzejewski-Szmek | |
7 | ||
8 | systemd is free software; you can redistribute it and/or modify it | |
9 | under the terms of the GNU Lesser General Public License as published by | |
10 | the Free Software Foundation; either version 2.1 of the License, or | |
11 | (at your option) any later version. | |
12 | ||
13 | systemd is distributed in the hope that it will be useful, but | |
14 | WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | Lesser General Public License for more details. | |
17 | ||
18 | You should have received a copy of the GNU Lesser General Public License | |
19 | along with systemd; If not, see <http://www.gnu.org/licenses/>. | |
20 | ***/ | |
21 | ||
22 | #include "journal-remote-parse.h" | |
23 | #include "journald-native.h" | |
24 | ||
5eef597e | 25 | #define LINE_CHUNK 8*1024u |
60f067b4 JS |
26 | |
27 | void source_free(RemoteSource *source) { | |
28 | if (!source) | |
29 | return; | |
30 | ||
5eef597e | 31 | if (source->fd >= 0 && !source->passive_fd) { |
60f067b4 | 32 | log_debug("Closing fd:%d (%s)", source->fd, source->name); |
5eef597e | 33 | safe_close(source->fd); |
60f067b4 | 34 | } |
5eef597e | 35 | |
60f067b4 JS |
36 | free(source->name); |
37 | free(source->buf); | |
38 | iovw_free_contents(&source->iovw); | |
5eef597e MP |
39 | |
40 | log_debug("Writer ref count %u", source->writer->n_ref); | |
41 | writer_unref(source->writer); | |
42 | ||
43 | sd_event_source_unref(source->event); | |
44 | ||
60f067b4 JS |
45 | free(source); |
46 | } | |
47 | ||
5eef597e MP |
48 | /** |
49 | * Initialize zero-filled source with given values. On success, takes | |
50 | * ownerhship of fd and writer, otherwise does not touch them. | |
51 | */ | |
52 | RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) { | |
53 | ||
54 | RemoteSource *source; | |
55 | ||
56 | log_debug("Creating source for %sfd:%d (%s)", | |
57 | passive_fd ? "passive " : "", fd, name); | |
58 | ||
59 | assert(fd >= 0); | |
60 | ||
61 | source = new0(RemoteSource, 1); | |
62 | if (!source) | |
63 | return NULL; | |
64 | ||
65 | source->fd = fd; | |
66 | source->passive_fd = passive_fd; | |
67 | source->name = name; | |
68 | source->writer = writer; | |
69 | ||
70 | return source; | |
71 | } | |
72 | ||
73 | static char* realloc_buffer(RemoteSource *source, size_t size) { | |
74 | char *b, *old = source->buf; | |
75 | ||
76 | b = GREEDY_REALLOC(source->buf, source->size, size); | |
77 | if (!b) | |
78 | return NULL; | |
79 | ||
80 | iovw_rebase(&source->iovw, old, source->buf); | |
81 | ||
82 | return b; | |
83 | } | |
84 | ||
60f067b4 | 85 | static int get_line(RemoteSource *source, char **line, size_t *size) { |
5eef597e | 86 | ssize_t n; |
60f067b4 | 87 | char *c = NULL; |
60f067b4 JS |
88 | |
89 | assert(source); | |
90 | assert(source->state == STATE_LINE); | |
5eef597e | 91 | assert(source->offset <= source->filled); |
60f067b4 JS |
92 | assert(source->filled <= source->size); |
93 | assert(source->buf == NULL || source->size > 0); | |
5eef597e | 94 | assert(source->fd >= 0); |
60f067b4 | 95 | |
5eef597e MP |
96 | while (true) { |
97 | if (source->buf) { | |
98 | size_t start = MAX(source->scanned, source->offset); | |
60f067b4 | 99 | |
5eef597e MP |
100 | c = memchr(source->buf + start, '\n', |
101 | source->filled - start); | |
102 | if (c != NULL) | |
103 | break; | |
104 | } | |
60f067b4 | 105 | |
5eef597e MP |
106 | source->scanned = source->filled; |
107 | if (source->scanned >= DATA_SIZE_MAX) { | |
108 | log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX); | |
109 | return -E2BIG; | |
110 | } | |
60f067b4 | 111 | |
5eef597e MP |
112 | if (source->passive_fd) |
113 | /* we have to wait for some data to come to us */ | |
114 | return -EWOULDBLOCK; | |
60f067b4 | 115 | |
5eef597e MP |
116 | if (source->size - source->filled < LINE_CHUNK && |
117 | !realloc_buffer(source, | |
118 | MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX))) | |
119 | return log_oom(); | |
120 | ||
121 | assert(source->size - source->filled >= LINE_CHUNK || | |
122 | source->size == ENTRY_SIZE_MAX); | |
123 | ||
124 | n = read(source->fd, source->buf + source->filled, | |
125 | source->size - source->filled); | |
126 | if (n < 0) { | |
127 | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
128 | log_error("read(%d, ..., %zd): %m", source->fd, | |
129 | source->size - source->filled); | |
130 | return -errno; | |
131 | } else if (n == 0) | |
132 | return 0; | |
133 | ||
134 | source->filled += n; | |
60f067b4 | 135 | } |
5eef597e MP |
136 | |
137 | *line = source->buf + source->offset; | |
138 | *size = c + 1 - source->buf - source->offset; | |
139 | source->offset += *size; | |
60f067b4 JS |
140 | |
141 | return 1; | |
142 | } | |
143 | ||
144 | int push_data(RemoteSource *source, const char *data, size_t size) { | |
145 | assert(source); | |
146 | assert(source->state != STATE_EOF); | |
147 | ||
5eef597e MP |
148 | if (!realloc_buffer(source, source->filled + size)) { |
149 | log_error("Failed to store received data of size %zu " | |
150 | "(in addition to existing %zu bytes with %zu filled): %s", | |
151 | size, source->size, source->filled, strerror(ENOMEM)); | |
152 | return -ENOMEM; | |
153 | } | |
60f067b4 JS |
154 | |
155 | memcpy(source->buf + source->filled, data, size); | |
156 | source->filled += size; | |
157 | ||
158 | return 0; | |
159 | } | |
160 | ||
161 | static int fill_fixed_size(RemoteSource *source, void **data, size_t size) { | |
60f067b4 JS |
162 | |
163 | assert(source); | |
164 | assert(source->state == STATE_DATA_START || | |
165 | source->state == STATE_DATA || | |
166 | source->state == STATE_DATA_FINISH); | |
167 | assert(size <= DATA_SIZE_MAX); | |
5eef597e | 168 | assert(source->offset <= source->filled); |
60f067b4 JS |
169 | assert(source->filled <= source->size); |
170 | assert(source->buf != NULL || source->size == 0); | |
171 | assert(source->buf == NULL || source->size > 0); | |
5eef597e | 172 | assert(source->fd >= 0); |
60f067b4 JS |
173 | assert(data); |
174 | ||
5eef597e MP |
175 | while (source->filled - source->offset < size) { |
176 | int n; | |
177 | ||
178 | if (source->passive_fd) | |
60f067b4 JS |
179 | /* we have to wait for some data to come to us */ |
180 | return -EWOULDBLOCK; | |
181 | ||
5eef597e | 182 | if (!realloc_buffer(source, source->offset + size)) |
60f067b4 JS |
183 | return log_oom(); |
184 | ||
185 | n = read(source->fd, source->buf + source->filled, | |
186 | source->size - source->filled); | |
187 | if (n < 0) { | |
188 | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
189 | log_error("read(%d, ..., %zd): %m", source->fd, | |
190 | source->size - source->filled); | |
191 | return -errno; | |
192 | } else if (n == 0) | |
193 | return 0; | |
194 | ||
195 | source->filled += n; | |
196 | } | |
197 | ||
5eef597e MP |
198 | *data = source->buf + source->offset; |
199 | source->offset += size; | |
60f067b4 JS |
200 | |
201 | return 1; | |
202 | } | |
203 | ||
204 | static int get_data_size(RemoteSource *source) { | |
205 | int r; | |
5eef597e | 206 | void *data; |
60f067b4 JS |
207 | |
208 | assert(source); | |
209 | assert(source->state == STATE_DATA_START); | |
210 | assert(source->data_size == 0); | |
211 | ||
212 | r = fill_fixed_size(source, &data, sizeof(uint64_t)); | |
213 | if (r <= 0) | |
214 | return r; | |
215 | ||
216 | source->data_size = le64toh( *(uint64_t *) data ); | |
217 | if (source->data_size > DATA_SIZE_MAX) { | |
5eef597e | 218 | log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u", |
60f067b4 JS |
219 | source->data_size, DATA_SIZE_MAX); |
220 | return -EINVAL; | |
221 | } | |
222 | if (source->data_size == 0) | |
223 | log_warning("Binary field with zero length"); | |
224 | ||
225 | return 1; | |
226 | } | |
227 | ||
228 | static int get_data_data(RemoteSource *source, void **data) { | |
229 | int r; | |
230 | ||
231 | assert(source); | |
232 | assert(data); | |
233 | assert(source->state == STATE_DATA); | |
234 | ||
235 | r = fill_fixed_size(source, data, source->data_size); | |
236 | if (r <= 0) | |
237 | return r; | |
238 | ||
239 | return 1; | |
240 | } | |
241 | ||
242 | static int get_data_newline(RemoteSource *source) { | |
243 | int r; | |
5eef597e | 244 | char *data; |
60f067b4 JS |
245 | |
246 | assert(source); | |
247 | assert(source->state == STATE_DATA_FINISH); | |
248 | ||
249 | r = fill_fixed_size(source, (void**) &data, 1); | |
250 | if (r <= 0) | |
251 | return r; | |
252 | ||
253 | assert(data); | |
254 | if (*data != '\n') { | |
255 | log_error("expected newline, got '%c'", *data); | |
256 | return -EINVAL; | |
257 | } | |
258 | ||
259 | return 1; | |
260 | } | |
261 | ||
262 | static int process_dunder(RemoteSource *source, char *line, size_t n) { | |
263 | const char *timestamp; | |
264 | int r; | |
265 | ||
266 | assert(line); | |
267 | assert(n > 0); | |
268 | assert(line[n-1] == '\n'); | |
269 | ||
270 | /* XXX: is it worth to support timestamps in extended format? | |
271 | * We don't produce them, but who knows... */ | |
272 | ||
273 | timestamp = startswith(line, "__CURSOR="); | |
274 | if (timestamp) | |
275 | /* ignore __CURSOR */ | |
276 | return 1; | |
277 | ||
278 | timestamp = startswith(line, "__REALTIME_TIMESTAMP="); | |
279 | if (timestamp) { | |
280 | long long unsigned x; | |
281 | line[n-1] = '\0'; | |
282 | r = safe_atollu(timestamp, &x); | |
283 | if (r < 0) | |
284 | log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp); | |
285 | else | |
286 | source->ts.realtime = x; | |
287 | return r < 0 ? r : 1; | |
288 | } | |
289 | ||
290 | timestamp = startswith(line, "__MONOTONIC_TIMESTAMP="); | |
291 | if (timestamp) { | |
292 | long long unsigned x; | |
293 | line[n-1] = '\0'; | |
294 | r = safe_atollu(timestamp, &x); | |
295 | if (r < 0) | |
296 | log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp); | |
297 | else | |
298 | source->ts.monotonic = x; | |
299 | return r < 0 ? r : 1; | |
300 | } | |
301 | ||
302 | timestamp = startswith(line, "__"); | |
303 | if (timestamp) { | |
304 | log_notice("Unknown dunder line %s", line); | |
305 | return 1; | |
306 | } | |
307 | ||
308 | /* no dunder */ | |
309 | return 0; | |
310 | } | |
311 | ||
312 | int process_data(RemoteSource *source) { | |
313 | int r; | |
314 | ||
315 | switch(source->state) { | |
316 | case STATE_LINE: { | |
317 | char *line, *sep; | |
318 | size_t n; | |
319 | ||
320 | assert(source->data_size == 0); | |
321 | ||
322 | r = get_line(source, &line, &n); | |
323 | if (r < 0) | |
324 | return r; | |
325 | if (r == 0) { | |
326 | source->state = STATE_EOF; | |
327 | return r; | |
328 | } | |
329 | assert(n > 0); | |
330 | assert(line[n-1] == '\n'); | |
331 | ||
332 | if (n == 1) { | |
5eef597e | 333 | log_trace("Received empty line, event is ready"); |
60f067b4 JS |
334 | return 1; |
335 | } | |
336 | ||
337 | r = process_dunder(source, line, n); | |
5eef597e | 338 | if (r != 0) |
60f067b4 | 339 | return r < 0 ? r : 0; |
60f067b4 JS |
340 | |
341 | /* MESSAGE=xxx\n | |
342 | or | |
343 | COREDUMP\n | |
344 | LLLLLLLL0011223344...\n | |
345 | */ | |
346 | sep = memchr(line, '=', n); | |
347 | if (sep) | |
348 | /* chomp newline */ | |
349 | n--; | |
350 | else | |
351 | /* replace \n with = */ | |
352 | line[n-1] = '='; | |
5eef597e | 353 | log_trace("Received: %.*s", (int) n, line); |
60f067b4 JS |
354 | |
355 | r = iovw_put(&source->iovw, line, n); | |
356 | if (r < 0) { | |
357 | log_error("Failed to put line in iovect"); | |
60f067b4 JS |
358 | return r; |
359 | } | |
360 | ||
361 | if (!sep) | |
362 | source->state = STATE_DATA_START; | |
363 | return 0; /* continue */ | |
364 | } | |
365 | ||
366 | case STATE_DATA_START: | |
367 | assert(source->data_size == 0); | |
368 | ||
369 | r = get_data_size(source); | |
5eef597e | 370 | // log_debug("get_data_size() -> %d", r); |
60f067b4 JS |
371 | if (r < 0) |
372 | return r; | |
373 | if (r == 0) { | |
374 | source->state = STATE_EOF; | |
375 | return 0; | |
376 | } | |
377 | ||
378 | source->state = source->data_size > 0 ? | |
379 | STATE_DATA : STATE_DATA_FINISH; | |
380 | ||
381 | return 0; /* continue */ | |
382 | ||
383 | case STATE_DATA: { | |
384 | void *data; | |
385 | ||
386 | assert(source->data_size > 0); | |
387 | ||
388 | r = get_data_data(source, &data); | |
5eef597e | 389 | // log_debug("get_data_data() -> %d", r); |
60f067b4 JS |
390 | if (r < 0) |
391 | return r; | |
392 | if (r == 0) { | |
393 | source->state = STATE_EOF; | |
394 | return 0; | |
395 | } | |
396 | ||
397 | assert(data); | |
398 | ||
399 | r = iovw_put(&source->iovw, data, source->data_size); | |
400 | if (r < 0) { | |
401 | log_error("failed to put binary buffer in iovect"); | |
402 | return r; | |
403 | } | |
404 | ||
405 | source->state = STATE_DATA_FINISH; | |
406 | ||
407 | return 0; /* continue */ | |
408 | } | |
409 | ||
410 | case STATE_DATA_FINISH: | |
411 | r = get_data_newline(source); | |
5eef597e | 412 | // log_debug("get_data_newline() -> %d", r); |
60f067b4 JS |
413 | if (r < 0) |
414 | return r; | |
415 | if (r == 0) { | |
416 | source->state = STATE_EOF; | |
417 | return 0; | |
418 | } | |
419 | ||
420 | source->data_size = 0; | |
421 | source->state = STATE_LINE; | |
422 | ||
423 | return 0; /* continue */ | |
424 | default: | |
425 | assert_not_reached("wtf?"); | |
426 | } | |
427 | } | |
428 | ||
5eef597e MP |
429 | int process_source(RemoteSource *source, bool compress, bool seal) { |
430 | size_t remain, target; | |
60f067b4 JS |
431 | int r; |
432 | ||
433 | assert(source); | |
5eef597e | 434 | assert(source->writer); |
60f067b4 JS |
435 | |
436 | r = process_data(source); | |
437 | if (r <= 0) | |
438 | return r; | |
439 | ||
440 | /* We have a full event */ | |
5eef597e MP |
441 | log_trace("Received a full event from source@%p fd:%d (%s)", |
442 | source, source->fd, source->name); | |
60f067b4 JS |
443 | |
444 | if (!source->iovw.count) { | |
445 | log_warning("Entry with no payload, skipping"); | |
446 | goto freeing; | |
447 | } | |
448 | ||
449 | assert(source->iovw.iovec); | |
450 | assert(source->iovw.count); | |
451 | ||
5eef597e | 452 | r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal); |
60f067b4 JS |
453 | if (r < 0) |
454 | log_error("Failed to write entry of %zu bytes: %s", | |
455 | iovw_size(&source->iovw), strerror(-r)); | |
456 | else | |
457 | r = 1; | |
458 | ||
459 | freeing: | |
460 | iovw_free_contents(&source->iovw); | |
5eef597e MP |
461 | |
462 | /* possibly reset buffer position */ | |
463 | remain = source->filled - source->offset; | |
464 | ||
465 | if (remain == 0) /* no brainer */ | |
466 | source->offset = source->scanned = source->filled = 0; | |
467 | else if (source->offset > source->size - source->filled && | |
468 | source->offset > remain) { | |
469 | memcpy(source->buf, source->buf + source->offset, remain); | |
470 | source->offset = source->scanned = 0; | |
471 | source->filled = remain; | |
472 | } | |
473 | ||
474 | target = source->size; | |
475 | while (target > 16 * LINE_CHUNK && remain < target / 2) | |
476 | target /= 2; | |
477 | if (target < source->size) { | |
478 | char *tmp; | |
479 | ||
480 | tmp = realloc(source->buf, target); | |
481 | if (!tmp) | |
482 | log_warning("Failed to reallocate buffer to (smaller) size %zu", | |
483 | target); | |
484 | else { | |
485 | log_debug("Reallocated buffer from %zu to %zu bytes", | |
486 | source->size, target); | |
487 | source->buf = tmp; | |
488 | source->size = target; | |
489 | } | |
490 | } | |
491 | ||
60f067b4 JS |
492 | return r; |
493 | } |