]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <thrift/thrift-config.h> | |
21 | ||
22 | #include <thrift/transport/TFileTransport.h> | |
23 | #include <thrift/transport/TTransportUtils.h> | |
24 | #include <thrift/transport/PlatformSocket.h> | |
25 | #include <thrift/concurrency/FunctionRunner.h> | |
26 | ||
27 | #include <boost/version.hpp> | |
28 | ||
29 | #ifdef HAVE_SYS_TIME_H | |
30 | #include <sys/time.h> | |
31 | #else | |
32 | #include <time.h> | |
33 | #endif | |
34 | #include <fcntl.h> | |
35 | #ifdef HAVE_UNISTD_H | |
36 | #include <unistd.h> | |
37 | #endif | |
38 | #ifdef HAVE_STRINGS_H | |
39 | #include <strings.h> | |
40 | #endif | |
41 | #include <cstdlib> | |
42 | #include <cstring> | |
43 | #include <iostream> | |
44 | #include <limits> | |
45 | #include <memory> | |
46 | #ifdef HAVE_SYS_STAT_H | |
47 | #include <sys/stat.h> | |
48 | #endif | |
49 | ||
50 | #ifdef _WIN32 | |
51 | #include <io.h> | |
52 | #endif | |
53 | ||
54 | namespace apache { | |
55 | namespace thrift { | |
56 | namespace transport { | |
57 | ||
58 | using std::shared_ptr; | |
59 | using std::cerr; | |
60 | using std::cout; | |
61 | using std::endl; | |
62 | using std::string; | |
63 | using namespace apache::thrift::protocol; | |
64 | using namespace apache::thrift::concurrency; | |
65 | ||
66 | TFileTransport::TFileTransport(string path, bool readOnly) | |
67 | : readState_(), | |
68 | readBuff_(nullptr), | |
69 | currentEvent_(nullptr), | |
70 | readBuffSize_(DEFAULT_READ_BUFF_SIZE), | |
71 | readTimeout_(NO_TAIL_READ_TIMEOUT), | |
72 | chunkSize_(DEFAULT_CHUNK_SIZE), | |
73 | eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE), | |
74 | flushMaxUs_(DEFAULT_FLUSH_MAX_US), | |
75 | flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES), | |
76 | maxEventSize_(DEFAULT_MAX_EVENT_SIZE), | |
77 | maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS), | |
78 | eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US), | |
79 | corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US), | |
80 | writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US), | |
81 | dequeueBuffer_(nullptr), | |
82 | enqueueBuffer_(nullptr), | |
83 | notFull_(&mutex_), | |
84 | notEmpty_(&mutex_), | |
85 | closing_(false), | |
86 | flushed_(&mutex_), | |
87 | forceFlush_(false), | |
88 | filename_(path), | |
89 | fd_(0), | |
90 | bufferAndThreadInitialized_(false), | |
91 | offset_(0), | |
92 | lastBadChunk_(0), | |
93 | numCorruptedEventsInChunk_(0), | |
94 | readOnly_(readOnly) { | |
95 | threadFactory_.setDetached(false); | |
96 | openLogFile(); | |
97 | } | |
98 | ||
99 | void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { | |
100 | filename_ = filename; | |
101 | offset_ = offset; | |
102 | ||
103 | // check if current file is still open | |
104 | if (fd_ > 0) { | |
105 | // flush any events in the queue | |
106 | flush(); | |
107 | GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str()); | |
108 | if (-1 == ::THRIFT_CLOSE(fd_)) { | |
109 | int errno_copy = THRIFT_ERRNO; | |
110 | GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy); | |
111 | throw TTransportException(TTransportException::UNKNOWN, | |
112 | "TFileTransport: error in file close", | |
113 | errno_copy); | |
114 | } else { | |
115 | // successfully closed fd | |
116 | fd_ = 0; | |
117 | } | |
118 | } | |
119 | ||
120 | if (fd) { | |
121 | fd_ = fd; | |
122 | } else { | |
123 | // open file if the input fd is 0 | |
124 | openLogFile(); | |
125 | } | |
126 | } | |
127 | ||
128 | TFileTransport::~TFileTransport() { | |
129 | // flush the buffer if a writer thread is active | |
130 | if (writerThread_.get()) { | |
131 | // set state to closing | |
132 | closing_ = true; | |
133 | ||
134 | // wake up the writer thread | |
135 | // Since closing_ is true, it will attempt to flush all data, then exit. | |
136 | notEmpty_.notify(); | |
137 | ||
138 | writerThread_->join(); | |
139 | writerThread_.reset(); | |
140 | } | |
141 | ||
142 | if (dequeueBuffer_) { | |
143 | delete dequeueBuffer_; | |
144 | dequeueBuffer_ = nullptr; | |
145 | } | |
146 | ||
147 | if (enqueueBuffer_) { | |
148 | delete enqueueBuffer_; | |
149 | enqueueBuffer_ = nullptr; | |
150 | } | |
151 | ||
152 | if (readBuff_) { | |
153 | delete[] readBuff_; | |
154 | readBuff_ = nullptr; | |
155 | } | |
156 | ||
157 | if (currentEvent_) { | |
158 | delete currentEvent_; | |
159 | currentEvent_ = nullptr; | |
160 | } | |
161 | ||
162 | // close logfile | |
163 | if (fd_ > 0) { | |
164 | if (-1 == ::THRIFT_CLOSE(fd_)) { | |
165 | GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO); | |
166 | } else { | |
167 | // successfully closed fd | |
168 | fd_ = 0; | |
169 | } | |
170 | } | |
171 | } | |
172 | ||
173 | bool TFileTransport::initBufferAndWriteThread() { | |
174 | if (bufferAndThreadInitialized_) { | |
175 | T_ERROR("%s", "Trying to double-init TFileTransport"); | |
176 | return false; | |
177 | } | |
178 | ||
179 | if (!writerThread_.get()) { | |
180 | writerThread_ = threadFactory_.newThread( | |
181 | apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this)); | |
182 | writerThread_->start(); | |
183 | } | |
184 | ||
185 | dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); | |
186 | enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); | |
187 | bufferAndThreadInitialized_ = true; | |
188 | ||
189 | return true; | |
190 | } | |
191 | ||
192 | void TFileTransport::write(const uint8_t* buf, uint32_t len) { | |
193 | if (readOnly_) { | |
194 | throw TTransportException("TFileTransport: attempting to write to file opened readonly"); | |
195 | } | |
196 | ||
197 | enqueueEvent(buf, len); | |
198 | } | |
199 | ||
200 | template <class _T> | |
201 | struct uniqueDeleter | |
202 | { | |
203 | void operator()(_T *ptr) const { delete ptr; } | |
204 | }; | |
205 | ||
206 | void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { | |
207 | // can't enqueue more events if file is going to close | |
208 | if (closing_) { | |
209 | return; | |
210 | } | |
211 | ||
212 | // make sure that event size is valid | |
213 | if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) { | |
214 | T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_); | |
215 | return; | |
216 | } | |
217 | ||
218 | if (eventLen == 0) { | |
219 | T_ERROR("%s", "cannot enqueue an empty event"); | |
220 | return; | |
221 | } | |
222 | ||
223 | std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo()); | |
224 | toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4]; | |
225 | ||
226 | // first 4 bytes is the event length | |
227 | memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4); | |
228 | // actual event contents | |
229 | memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); | |
230 | toEnqueue->eventSize_ = eventLen + 4; | |
231 | ||
232 | // lock mutex | |
233 | Guard g(mutex_); | |
234 | ||
235 | // make sure that enqueue buffer is initialized and writer thread is running | |
236 | if (!bufferAndThreadInitialized_) { | |
237 | if (!initBufferAndWriteThread()) { | |
238 | return; | |
239 | } | |
240 | } | |
241 | ||
242 | // Can't enqueue while buffer is full | |
243 | while (enqueueBuffer_->isFull()) { | |
244 | notFull_.wait(); | |
245 | } | |
246 | ||
247 | // We shouldn't be trying to enqueue new data while a forced flush is | |
248 | // requested. (Otherwise the writer thread might not ever be able to finish | |
249 | // the flush if more data keeps being enqueued.) | |
250 | assert(!forceFlush_); | |
251 | ||
252 | // add to the buffer | |
253 | eventInfo* pEvent = toEnqueue.release(); | |
254 | if (!enqueueBuffer_->addEvent(pEvent)) { | |
255 | delete pEvent; | |
256 | return; | |
257 | } | |
258 | ||
259 | // signal anybody who's waiting for the buffer to be non-empty | |
260 | notEmpty_.notify(); | |
261 | ||
262 | // this really should be a loop where it makes sure it got flushed | |
263 | // because condition variables can get triggered by the os for no reason | |
264 | // it is probably a non-factor for the time being | |
265 | } | |
266 | ||
267 | bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) { | |
268 | bool swap; | |
269 | Guard g(mutex_); | |
270 | ||
271 | if (!enqueueBuffer_->isEmpty()) { | |
272 | swap = true; | |
273 | } else if (closing_) { | |
274 | // even though there is no data to write, | |
275 | // return immediately if the transport is closing | |
276 | swap = false; | |
277 | } else { | |
278 | if (deadline != nullptr) { | |
279 | // if we were handed a deadline time struct, do a timed wait | |
280 | notEmpty_.waitForTime(*deadline); | |
281 | } else { | |
282 | // just wait until the buffer gets an item | |
283 | notEmpty_.wait(); | |
284 | } | |
285 | ||
286 | // could be empty if we timed out | |
287 | swap = enqueueBuffer_->isEmpty(); | |
288 | } | |
289 | ||
290 | if (swap) { | |
291 | TFileTransportBuffer* temp = enqueueBuffer_; | |
292 | enqueueBuffer_ = dequeueBuffer_; | |
293 | dequeueBuffer_ = temp; | |
294 | } | |
295 | ||
296 | if (swap) { | |
297 | notFull_.notify(); | |
298 | } | |
299 | ||
300 | return swap; | |
301 | } | |
302 | ||
303 | void TFileTransport::writerThread() { | |
304 | bool hasIOError = false; | |
305 | ||
306 | // open file if it is not open | |
307 | if (!fd_) { | |
308 | try { | |
309 | openLogFile(); | |
310 | } catch (...) { | |
311 | int errno_copy = THRIFT_ERRNO; | |
312 | GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy); | |
313 | fd_ = 0; | |
314 | hasIOError = true; | |
315 | } | |
316 | } | |
317 | ||
318 | // set the offset to the correct value (EOF) | |
319 | if (!hasIOError) { | |
320 | try { | |
321 | seekToEnd(); | |
322 | // throw away any partial events | |
323 | offset_ += readState_.lastDispatchPtr_; | |
324 | if (0 == THRIFT_FTRUNCATE(fd_, offset_)) { | |
325 | readState_.resetAllValues(); | |
326 | } else { | |
327 | int errno_copy = THRIFT_ERRNO; | |
328 | GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy); | |
329 | hasIOError = true; | |
330 | } | |
331 | } catch (...) { | |
332 | int errno_copy = THRIFT_ERRNO; | |
333 | GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy); | |
334 | hasIOError = true; | |
335 | } | |
336 | } | |
337 | ||
338 | // Figure out the next time by which a flush must take place | |
339 | auto ts_next_flush = getNextFlushTime(); | |
340 | uint32_t unflushed = 0; | |
341 | ||
342 | while (1) { | |
343 | // this will only be true when the destructor is being invoked | |
344 | if (closing_) { | |
345 | if (hasIOError) { | |
346 | return; | |
347 | } | |
348 | ||
349 | // Try to empty buffers before exit | |
350 | if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { | |
351 | ::THRIFT_FSYNC(fd_); | |
352 | if (-1 == ::THRIFT_CLOSE(fd_)) { | |
353 | int errno_copy = THRIFT_ERRNO; | |
354 | GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); | |
355 | } else { | |
356 | // fd successfully closed | |
357 | fd_ = 0; | |
358 | } | |
359 | return; | |
360 | } | |
361 | } | |
362 | ||
363 | if (swapEventBuffers(&ts_next_flush)) { | |
364 | eventInfo* outEvent; | |
365 | while (nullptr != (outEvent = dequeueBuffer_->getNext())) { | |
366 | // Remove an event from the buffer and write it out to disk. If there is any IO error, for | |
367 | // instance, | |
368 | // the output file is unmounted or deleted, then this event is dropped. However, the writer | |
369 | // thread | |
370 | // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then | |
371 | // start writing | |
372 | // from the end. | |
373 | ||
374 | while (hasIOError) { | |
375 | T_ERROR( | |
376 | "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors", | |
377 | writerThreadIOErrorSleepTime_); | |
378 | THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_); | |
379 | if (closing_) { | |
380 | return; | |
381 | } | |
382 | if (!fd_) { | |
383 | ::THRIFT_CLOSE(fd_); | |
384 | fd_ = 0; | |
385 | } | |
386 | try { | |
387 | openLogFile(); | |
388 | seekToEnd(); | |
389 | unflushed = 0; | |
390 | hasIOError = false; | |
391 | T_LOG_OPER( | |
392 | "TFileTransport: log file %s reopened by writer thread during error recovery", | |
393 | filename_.c_str()); | |
394 | } catch (...) { | |
395 | T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", | |
396 | filename_.c_str()); | |
397 | } | |
398 | } | |
399 | ||
400 | // sanity check on event | |
401 | if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { | |
402 | T_ERROR("msg size is greater than max event size: %u > %u\n", | |
403 | outEvent->eventSize_, | |
404 | maxEventSize_); | |
405 | continue; | |
406 | } | |
407 | ||
408 | // If chunking is required, then make sure that msg does not cross chunk boundary | |
409 | if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { | |
410 | // event size must be less than chunk size | |
411 | if (outEvent->eventSize_ > chunkSize_) { | |
412 | T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", | |
413 | outEvent->eventSize_, | |
414 | chunkSize_); | |
415 | continue; | |
416 | } | |
417 | ||
418 | int64_t chunk1 = offset_ / chunkSize_; | |
419 | int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_; | |
420 | ||
421 | // if adding this event will cross a chunk boundary, pad the chunk with zeros | |
422 | if (chunk1 != chunk2) { | |
423 | // refetch the offset to keep in sync | |
424 | offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR); | |
425 | auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_); | |
426 | ||
427 | auto* zeros = new uint8_t[padding]; | |
428 | memset(zeros, '\0', padding); | |
429 | boost::scoped_array<uint8_t> array(zeros); | |
430 | if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) { | |
431 | int errno_copy = THRIFT_ERRNO; | |
432 | GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", | |
433 | errno_copy); | |
434 | hasIOError = true; | |
435 | continue; | |
436 | } | |
437 | unflushed += padding; | |
438 | offset_ += padding; | |
439 | } | |
440 | } | |
441 | ||
442 | // write the dequeued event to the file | |
443 | if (outEvent->eventSize_ > 0) { | |
444 | if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { | |
445 | int errno_copy = THRIFT_ERRNO; | |
446 | GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy); | |
447 | hasIOError = true; | |
448 | continue; | |
449 | } | |
450 | unflushed += outEvent->eventSize_; | |
451 | offset_ += outEvent->eventSize_; | |
452 | } | |
453 | } | |
454 | dequeueBuffer_->reset(); | |
455 | } | |
456 | ||
457 | if (hasIOError) { | |
458 | continue; | |
459 | } | |
460 | ||
461 | // Local variable to cache the state of forceFlush_. | |
462 | // | |
463 | // We only want to check the value of forceFlush_ once each time around the | |
464 | // loop. If we check it more than once without holding the lock the entire | |
465 | // time, it could have changed state in between. This will result in us | |
466 | // making inconsistent decisions. | |
467 | bool forced_flush = false; | |
468 | { | |
469 | Guard g(mutex_); | |
470 | if (forceFlush_) { | |
471 | if (!enqueueBuffer_->isEmpty()) { | |
472 | // If forceFlush_ is true, we need to flush all available data. | |
473 | // If enqueueBuffer_ is not empty, go back to the start of the loop to | |
474 | // write it out. | |
475 | // | |
476 | // We know the main thread is waiting on forceFlush_ to be cleared, | |
477 | // so no new events will be added to enqueueBuffer_ until we clear | |
478 | // forceFlush_. Therefore the next time around the loop enqueueBuffer_ | |
479 | // is guaranteed to be empty. (I.e., we're guaranteed to make progress | |
480 | // and clear forceFlush_ the next time around the loop.) | |
481 | continue; | |
482 | } | |
483 | forced_flush = true; | |
484 | } | |
485 | } | |
486 | ||
487 | // determine if we need to perform an fsync | |
488 | bool flush = false; | |
489 | if (forced_flush || unflushed > flushMaxBytes_) { | |
490 | flush = true; | |
491 | } else { | |
492 | if (std::chrono::steady_clock::now() > ts_next_flush) { | |
493 | if (unflushed > 0) { | |
494 | flush = true; | |
495 | } else { | |
496 | // If there is no new data since the last fsync, | |
497 | // don't perform the fsync, but do reset the timer. | |
498 | ts_next_flush = getNextFlushTime(); | |
499 | } | |
500 | } | |
501 | } | |
502 | ||
503 | if (flush) { | |
504 | // sync (force flush) file to disk | |
505 | THRIFT_FSYNC(fd_); | |
506 | unflushed = 0; | |
507 | ts_next_flush = getNextFlushTime(); | |
508 | ||
509 | // notify anybody waiting for flush completion | |
510 | if (forced_flush) { | |
511 | Guard g(mutex_); | |
512 | forceFlush_ = false; | |
513 | assert(enqueueBuffer_->isEmpty()); | |
514 | assert(dequeueBuffer_->isEmpty()); | |
515 | flushed_.notifyAll(); | |
516 | } | |
517 | } | |
518 | } | |
519 | } | |
520 | ||
521 | void TFileTransport::flush() { | |
522 | // file must be open for writing for any flushing to take place | |
523 | if (!writerThread_.get()) { | |
524 | return; | |
525 | } | |
526 | // wait for flush to take place | |
527 | Guard g(mutex_); | |
528 | ||
529 | // Indicate that we are requesting a flush | |
530 | forceFlush_ = true; | |
531 | // Wake up the writer thread so it will perform the flush immediately | |
532 | notEmpty_.notify(); | |
533 | ||
534 | while (forceFlush_) { | |
535 | flushed_.wait(); | |
536 | } | |
537 | } | |
538 | ||
539 | uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) { | |
540 | uint32_t have = 0; | |
541 | uint32_t get = 0; | |
542 | ||
543 | while (have < len) { | |
544 | get = read(buf + have, len - have); | |
545 | if (get <= 0) { | |
546 | throw TEOFException(); | |
547 | } | |
548 | have += get; | |
549 | } | |
550 | ||
551 | return have; | |
552 | } | |
553 | ||
554 | bool TFileTransport::peek() { | |
555 | // check if there is an event ready to be read | |
556 | if (!currentEvent_) { | |
557 | currentEvent_ = readEvent(); | |
558 | } | |
559 | ||
560 | // did not manage to read an event from the file. This could have happened | |
561 | // if the timeout expired or there was some other error | |
562 | if (!currentEvent_) { | |
563 | return false; | |
564 | } | |
565 | ||
566 | // check if there is anything to read | |
567 | return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0; | |
568 | } | |
569 | ||
570 | uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { | |
571 | // check if there an event is ready to be read | |
572 | if (!currentEvent_) { | |
573 | currentEvent_ = readEvent(); | |
574 | } | |
575 | ||
576 | // did not manage to read an event from the file. This could have happened | |
577 | // if the timeout expired or there was some other error | |
578 | if (!currentEvent_) { | |
579 | return 0; | |
580 | } | |
581 | ||
582 | // read as much of the current event as possible | |
583 | int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; | |
584 | if (remaining <= (int32_t)len) { | |
585 | // copy over anything thats remaining | |
586 | if (remaining > 0) { | |
587 | memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining); | |
588 | } | |
589 | delete (currentEvent_); | |
590 | currentEvent_ = nullptr; | |
591 | return remaining; | |
592 | } | |
593 | ||
594 | // read as much as possible | |
595 | memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len); | |
596 | currentEvent_->eventBuffPos_ += len; | |
597 | return len; | |
598 | } | |
599 | ||
600 | // note caller is responsible for freeing returned events | |
601 | eventInfo* TFileTransport::readEvent() { | |
602 | int readTries = 0; | |
603 | ||
604 | if (!readBuff_) { | |
605 | readBuff_ = new uint8_t[readBuffSize_]; | |
606 | } | |
607 | ||
608 | while (1) { | |
609 | // read from the file if read buffer is exhausted | |
610 | if (readState_.bufferPtr_ == readState_.bufferLen_) { | |
611 | // advance the offset pointer | |
612 | offset_ += readState_.bufferLen_; | |
613 | readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_)); | |
614 | // if (readState_.bufferLen_) { | |
615 | // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); | |
616 | // } | |
617 | readState_.bufferPtr_ = 0; | |
618 | readState_.lastDispatchPtr_ = 0; | |
619 | ||
620 | // read error | |
621 | if (readState_.bufferLen_ == -1) { | |
622 | readState_.resetAllValues(); | |
623 | GlobalOutput("TFileTransport: error while reading from file"); | |
624 | throw TTransportException("TFileTransport: error while reading from file"); | |
625 | } else if (readState_.bufferLen_ == 0) { // EOF | |
626 | // wait indefinitely if there is no timeout | |
627 | if (readTimeout_ == TAIL_READ_TIMEOUT) { | |
628 | THRIFT_SLEEP_USEC(eofSleepTime_); | |
629 | continue; | |
630 | } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { | |
631 | // reset state | |
632 | readState_.resetState(0); | |
633 | return nullptr; | |
634 | } else if (readTimeout_ > 0) { | |
635 | // timeout already expired once | |
636 | if (readTries > 0) { | |
637 | readState_.resetState(0); | |
638 | return nullptr; | |
639 | } else { | |
640 | THRIFT_SLEEP_USEC(readTimeout_ * 1000); | |
641 | readTries++; | |
642 | continue; | |
643 | } | |
644 | } | |
645 | } | |
646 | } | |
647 | ||
648 | readTries = 0; | |
649 | ||
650 | // attempt to read an event from the buffer | |
651 | while (readState_.bufferPtr_ < readState_.bufferLen_) { | |
652 | if (readState_.readingSize_) { | |
653 | if (readState_.eventSizeBuffPos_ == 0) { | |
654 | if ((offset_ + readState_.bufferPtr_) / chunkSize_ | |
655 | != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) { | |
656 | // skip one byte towards chunk boundary | |
657 | // T_DEBUG_L(1, "Skipping a byte"); | |
658 | readState_.bufferPtr_++; | |
659 | continue; | |
660 | } | |
661 | } | |
662 | ||
663 | readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] | |
664 | = readBuff_[readState_.bufferPtr_++]; | |
665 | ||
666 | if (readState_.eventSizeBuffPos_ == 4) { | |
667 | if (readState_.getEventSize() == 0) { | |
668 | // 0 length event indicates padding | |
669 | // T_DEBUG_L(1, "Got padding"); | |
670 | readState_.resetState(readState_.lastDispatchPtr_); | |
671 | continue; | |
672 | } | |
673 | // got a valid event | |
674 | readState_.readingSize_ = false; | |
675 | if (readState_.event_) { | |
676 | delete (readState_.event_); | |
677 | } | |
678 | readState_.event_ = new eventInfo(); | |
679 | readState_.event_->eventSize_ = readState_.getEventSize(); | |
680 | ||
681 | // check if the event is corrupted and perform recovery if required | |
682 | if (isEventCorrupted()) { | |
683 | performRecovery(); | |
684 | // start from the top | |
685 | break; | |
686 | } | |
687 | } | |
688 | } else { | |
689 | if (!readState_.event_->eventBuff_) { | |
690 | readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_]; | |
691 | readState_.event_->eventBuffPos_ = 0; | |
692 | } | |
693 | // take either the entire event or the remaining bytes in the buffer | |
694 | int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_), | |
695 | readState_.event_->eventSize_ - readState_.event_->eventBuffPos_); | |
696 | ||
697 | // copy data from read buffer into event buffer | |
698 | memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, | |
699 | readBuff_ + readState_.bufferPtr_, | |
700 | reclaimBuffer); | |
701 | ||
702 | // increment position ptrs | |
703 | readState_.event_->eventBuffPos_ += reclaimBuffer; | |
704 | readState_.bufferPtr_ += reclaimBuffer; | |
705 | ||
706 | // check if the event has been read in full | |
707 | if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { | |
708 | // set the completed event to the current event | |
709 | eventInfo* completeEvent = readState_.event_; | |
710 | completeEvent->eventBuffPos_ = 0; | |
711 | ||
712 | readState_.event_ = nullptr; | |
713 | readState_.resetState(readState_.bufferPtr_); | |
714 | ||
715 | // exit criteria | |
716 | return completeEvent; | |
717 | } | |
718 | } | |
719 | } | |
720 | } | |
721 | } | |
722 | ||
723 | bool TFileTransport::isEventCorrupted() { | |
724 | // an error is triggered if: | |
725 | if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) { | |
726 | // 1. Event size is larger than user-speficied max-event size | |
727 | T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)", | |
728 | readState_.event_->eventSize_, | |
729 | maxEventSize_); | |
730 | return true; | |
731 | } else if (readState_.event_->eventSize_ > chunkSize_) { | |
732 | // 2. Event size is larger than chunk size | |
733 | T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)", | |
734 | readState_.event_->eventSize_, | |
735 | chunkSize_); | |
736 | return true; | |
737 | } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_) | |
738 | != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1) | |
739 | / chunkSize_)) { | |
740 | // 3. size indicates that event crosses chunk boundary | |
741 | T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu", | |
742 | readState_.event_->eventSize_, | |
743 | static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4)); | |
744 | ||
745 | return true; | |
746 | } | |
747 | ||
748 | return false; | |
749 | } | |
750 | ||
751 | void TFileTransport::performRecovery() { | |
752 | // perform some kickass recovery | |
753 | uint32_t curChunk = getCurChunk(); | |
754 | if (lastBadChunk_ == curChunk) { | |
755 | numCorruptedEventsInChunk_++; | |
756 | } else { | |
757 | lastBadChunk_ = curChunk; | |
758 | numCorruptedEventsInChunk_ = 1; | |
759 | } | |
760 | ||
761 | if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { | |
762 | // maybe there was an error in reading the file from disk | |
763 | // seek to the beginning of chunk and try again | |
764 | seekToChunk(curChunk); | |
765 | } else { | |
766 | ||
767 | // just skip ahead to the next chunk if we not already at the last chunk | |
768 | if (curChunk != (getNumChunks() - 1)) { | |
769 | seekToChunk(curChunk + 1); | |
770 | } else if (readTimeout_ == TAIL_READ_TIMEOUT) { | |
771 | // if tailing the file, wait until there is enough data to start | |
772 | // the next chunk | |
773 | while (curChunk == (getNumChunks() - 1)) { | |
774 | THRIFT_SLEEP_USEC(corruptedEventSleepTime_); | |
775 | } | |
776 | seekToChunk(curChunk + 1); | |
777 | } else { | |
778 | // pretty hosed at this stage, rewind the file back to the last successful | |
779 | // point and punt on the error | |
780 | readState_.resetState(readState_.lastDispatchPtr_); | |
781 | currentEvent_ = nullptr; | |
782 | char errorMsg[1024]; | |
783 | sprintf(errorMsg, | |
784 | "TFileTransport: log file corrupted at offset: %lu", | |
785 | static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_)); | |
786 | ||
787 | GlobalOutput(errorMsg); | |
788 | throw TTransportException(errorMsg); | |
789 | } | |
790 | } | |
791 | } | |
792 | ||
793 | void TFileTransport::seekToChunk(int32_t chunk) { | |
794 | if (fd_ <= 0) { | |
795 | throw TTransportException("File not open"); | |
796 | } | |
797 | ||
798 | int32_t numChunks = getNumChunks(); | |
799 | ||
800 | // file is empty, seeking to chunk is pointless | |
801 | if (numChunks == 0) { | |
802 | return; | |
803 | } | |
804 | ||
805 | // negative indicates reverse seek (from the end) | |
806 | if (chunk < 0) { | |
807 | chunk += numChunks; | |
808 | } | |
809 | ||
810 | // too large a value for reverse seek, just seek to beginning | |
811 | if (chunk < 0) { | |
812 | T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning..."); | |
813 | chunk = 0; | |
814 | } | |
815 | ||
816 | // cannot seek past EOF | |
817 | bool seekToEnd = false; | |
818 | off_t minEndOffset = 0; | |
819 | if (chunk >= numChunks) { | |
820 | T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead..."); | |
821 | seekToEnd = true; | |
822 | chunk = numChunks - 1; | |
823 | // this is the min offset to process events till | |
824 | minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END); | |
825 | } | |
826 | ||
827 | off_t newOffset = off_t(chunk) * chunkSize_; | |
828 | offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET); | |
829 | readState_.resetAllValues(); | |
830 | currentEvent_ = nullptr; | |
831 | if (offset_ == -1) { | |
832 | GlobalOutput("TFileTransport: lseek error in seekToChunk"); | |
833 | throw TTransportException("TFileTransport: lseek error in seekToChunk"); | |
834 | } | |
835 | ||
836 | // seek to EOF if user wanted to go to last chunk | |
837 | if (seekToEnd) { | |
838 | uint32_t oldReadTimeout = getReadTimeout(); | |
839 | setReadTimeout(NO_TAIL_READ_TIMEOUT); | |
840 | // keep on reading unti the last event at point of seekChunk call | |
841 | shared_ptr<eventInfo> event; | |
842 | while ((offset_ + readState_.bufferPtr_) < minEndOffset) { | |
843 | event.reset(readEvent()); | |
844 | if (event.get() == nullptr) { | |
845 | break; | |
846 | } | |
847 | } | |
848 | setReadTimeout(oldReadTimeout); | |
849 | } | |
850 | } | |
851 | ||
852 | void TFileTransport::seekToEnd() { | |
853 | seekToChunk(getNumChunks()); | |
854 | } | |
855 | ||
856 | uint32_t TFileTransport::getNumChunks() { | |
857 | if (fd_ <= 0) { | |
858 | return 0; | |
859 | } | |
860 | ||
861 | struct THRIFT_STAT f_info; | |
862 | int rv = ::THRIFT_FSTAT(fd_, &f_info); | |
863 | ||
864 | if (rv < 0) { | |
865 | int errno_copy = THRIFT_ERRNO; | |
866 | throw TTransportException(TTransportException::UNKNOWN, | |
867 | "TFileTransport::getNumChunks() (fstat)", | |
868 | errno_copy); | |
869 | } | |
870 | ||
871 | if (f_info.st_size > 0) { | |
872 | size_t numChunks = ((f_info.st_size) / chunkSize_) + 1; | |
873 | if (numChunks > (std::numeric_limits<uint32_t>::max)()) | |
874 | throw TTransportException("Too many chunks"); | |
875 | return static_cast<uint32_t>(numChunks); | |
876 | } | |
877 | ||
878 | // empty file has no chunks | |
879 | return 0; | |
880 | } | |
881 | ||
882 | uint32_t TFileTransport::getCurChunk() { | |
883 | return static_cast<uint32_t>(offset_ / chunkSize_); | |
884 | } | |
885 | ||
886 | // Utility Functions | |
887 | void TFileTransport::openLogFile() { | |
888 | #ifndef _WIN32 | |
889 | mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; | |
890 | int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND; | |
891 | #else | |
892 | int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE; | |
893 | int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND; | |
894 | #endif | |
895 | fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode); | |
896 | offset_ = 0; | |
897 | ||
898 | // make sure open call was successful | |
899 | if (fd_ == -1) { | |
900 | int errno_copy = THRIFT_ERRNO; | |
901 | GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy); | |
902 | throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy); | |
903 | } | |
904 | } | |
905 | ||
906 | std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() { | |
907 | return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_); | |
908 | } | |
909 | ||
910 | TFileTransportBuffer::TFileTransportBuffer(uint32_t size) | |
911 | : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) { | |
912 | buffer_ = new eventInfo* [size]; | |
913 | } | |
914 | ||
915 | TFileTransportBuffer::~TFileTransportBuffer() { | |
916 | if (buffer_) { | |
917 | for (uint32_t i = 0; i < writePoint_; i++) { | |
918 | delete buffer_[i]; | |
919 | } | |
920 | delete[] buffer_; | |
921 | buffer_ = nullptr; | |
922 | } | |
923 | } | |
924 | ||
925 | bool TFileTransportBuffer::addEvent(eventInfo* event) { | |
926 | if (bufferMode_ == READ) { | |
927 | GlobalOutput("Trying to write to a buffer in read mode"); | |
928 | } | |
929 | if (writePoint_ < size_) { | |
930 | buffer_[writePoint_++] = event; | |
931 | return true; | |
932 | } else { | |
933 | // buffer is full | |
934 | return false; | |
935 | } | |
936 | } | |
937 | ||
938 | eventInfo* TFileTransportBuffer::getNext() { | |
939 | if (bufferMode_ == WRITE) { | |
940 | bufferMode_ = READ; | |
941 | } | |
942 | if (readPoint_ < writePoint_) { | |
943 | return buffer_[readPoint_++]; | |
944 | } else { | |
945 | // no more entries | |
946 | return nullptr; | |
947 | } | |
948 | } | |
949 | ||
950 | void TFileTransportBuffer::reset() { | |
951 | if (bufferMode_ == WRITE || writePoint_ > readPoint_) { | |
952 | T_DEBUG("%s", "Resetting a buffer with unread entries"); | |
953 | } | |
954 | // Clean up the old entries | |
955 | for (uint32_t i = 0; i < writePoint_; i++) { | |
956 | delete buffer_[i]; | |
957 | } | |
958 | bufferMode_ = WRITE; | |
959 | writePoint_ = 0; | |
960 | readPoint_ = 0; | |
961 | } | |
962 | ||
963 | bool TFileTransportBuffer::isFull() { | |
964 | return writePoint_ == size_; | |
965 | } | |
966 | ||
967 | bool TFileTransportBuffer::isEmpty() { | |
968 | return writePoint_ == 0; | |
969 | } | |
970 | ||
971 | TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, | |
972 | shared_ptr<TProtocolFactory> protocolFactory, | |
973 | shared_ptr<TFileReaderTransport> inputTransport) | |
974 | : processor_(processor), | |
975 | inputProtocolFactory_(protocolFactory), | |
976 | outputProtocolFactory_(protocolFactory), | |
977 | inputTransport_(inputTransport) { | |
978 | ||
979 | // default the output transport to a null transport (common case) | |
980 | outputTransport_ = std::make_shared<TNullTransport>(); | |
981 | } | |
982 | ||
983 | TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, | |
984 | shared_ptr<TProtocolFactory> inputProtocolFactory, | |
985 | shared_ptr<TProtocolFactory> outputProtocolFactory, | |
986 | shared_ptr<TFileReaderTransport> inputTransport) | |
987 | : processor_(processor), | |
988 | inputProtocolFactory_(inputProtocolFactory), | |
989 | outputProtocolFactory_(outputProtocolFactory), | |
990 | inputTransport_(inputTransport) { | |
991 | ||
992 | // default the output transport to a null transport (common case) | |
993 | outputTransport_ = std::make_shared<TNullTransport>(); | |
994 | } | |
995 | ||
996 | TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, | |
997 | shared_ptr<TProtocolFactory> protocolFactory, | |
998 | shared_ptr<TFileReaderTransport> inputTransport, | |
999 | shared_ptr<TTransport> outputTransport) | |
1000 | : processor_(processor), | |
1001 | inputProtocolFactory_(protocolFactory), | |
1002 | outputProtocolFactory_(protocolFactory), | |
1003 | inputTransport_(inputTransport), | |
1004 | outputTransport_(outputTransport) { | |
1005 | } | |
1006 | ||
1007 | void TFileProcessor::process(uint32_t numEvents, bool tail) { | |
1008 | shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); | |
1009 | shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); | |
1010 | ||
1011 | // set the read timeout to 0 if tailing is required | |
1012 | int32_t oldReadTimeout = inputTransport_->getReadTimeout(); | |
1013 | if (tail) { | |
1014 | // save old read timeout so it can be restored | |
1015 | inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT); | |
1016 | } | |
1017 | ||
1018 | uint32_t numProcessed = 0; | |
1019 | while (1) { | |
1020 | // bad form to use exceptions for flow control but there is really | |
1021 | // no other way around it | |
1022 | try { | |
1023 | processor_->process(inputProtocol, outputProtocol, nullptr); | |
1024 | numProcessed++; | |
1025 | if ((numEvents > 0) && (numProcessed == numEvents)) { | |
1026 | return; | |
1027 | } | |
1028 | } catch (TEOFException&) { | |
1029 | if (!tail) { | |
1030 | break; | |
1031 | } | |
1032 | } catch (TException& te) { | |
1033 | cerr << te.what() << endl; | |
1034 | break; | |
1035 | } | |
1036 | } | |
1037 | ||
1038 | // restore old read timeout | |
1039 | if (tail) { | |
1040 | inputTransport_->setReadTimeout(oldReadTimeout); | |
1041 | } | |
1042 | } | |
1043 | ||
1044 | void TFileProcessor::processChunk() { | |
1045 | shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); | |
1046 | shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); | |
1047 | ||
1048 | uint32_t curChunk = inputTransport_->getCurChunk(); | |
1049 | ||
1050 | while (1) { | |
1051 | // bad form to use exceptions for flow control but there is really | |
1052 | // no other way around it | |
1053 | try { | |
1054 | processor_->process(inputProtocol, outputProtocol, nullptr); | |
1055 | if (curChunk != inputTransport_->getCurChunk()) { | |
1056 | break; | |
1057 | } | |
1058 | } catch (TEOFException&) { | |
1059 | break; | |
1060 | } catch (TException& te) { | |
1061 | cerr << te.what() << endl; | |
1062 | break; | |
1063 | } | |
1064 | } | |
1065 | } | |
1066 | } | |
1067 | } | |
1068 | } // apache::thrift::transport |