]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TFileTransport.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TFileTransport.cpp
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <thrift/thrift-config.h>
21
22#include <thrift/transport/TFileTransport.h>
23#include <thrift/transport/TTransportUtils.h>
24#include <thrift/transport/PlatformSocket.h>
25#include <thrift/concurrency/FunctionRunner.h>
26
27#include <boost/version.hpp>
28
29#ifdef HAVE_SYS_TIME_H
30#include <sys/time.h>
31#else
32#include <time.h>
33#endif
34#include <fcntl.h>
35#ifdef HAVE_UNISTD_H
36#include <unistd.h>
37#endif
38#ifdef HAVE_STRINGS_H
39#include <strings.h>
40#endif
41#include <cstdlib>
42#include <cstring>
43#include <iostream>
44#include <limits>
45#include <memory>
46#ifdef HAVE_SYS_STAT_H
47#include <sys/stat.h>
48#endif
49
50#ifdef _WIN32
51#include <io.h>
52#endif
53
54namespace apache {
55namespace thrift {
56namespace transport {
57
58using std::shared_ptr;
59using std::cerr;
60using std::cout;
61using std::endl;
62using std::string;
63using namespace apache::thrift::protocol;
64using namespace apache::thrift::concurrency;
65
66TFileTransport::TFileTransport(string path, bool readOnly)
67 : readState_(),
68 readBuff_(nullptr),
69 currentEvent_(nullptr),
70 readBuffSize_(DEFAULT_READ_BUFF_SIZE),
71 readTimeout_(NO_TAIL_READ_TIMEOUT),
72 chunkSize_(DEFAULT_CHUNK_SIZE),
73 eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
74 flushMaxUs_(DEFAULT_FLUSH_MAX_US),
75 flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
76 maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
77 maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
78 eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
79 corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
80 writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
81 dequeueBuffer_(nullptr),
82 enqueueBuffer_(nullptr),
83 notFull_(&mutex_),
84 notEmpty_(&mutex_),
85 closing_(false),
86 flushed_(&mutex_),
87 forceFlush_(false),
88 filename_(path),
89 fd_(0),
90 bufferAndThreadInitialized_(false),
91 offset_(0),
92 lastBadChunk_(0),
93 numCorruptedEventsInChunk_(0),
94 readOnly_(readOnly) {
95 threadFactory_.setDetached(false);
96 openLogFile();
97}
98
99void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
100 filename_ = filename;
101 offset_ = offset;
102
103 // check if current file is still open
104 if (fd_ > 0) {
105 // flush any events in the queue
106 flush();
107 GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
108 if (-1 == ::THRIFT_CLOSE(fd_)) {
109 int errno_copy = THRIFT_ERRNO;
110 GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
111 throw TTransportException(TTransportException::UNKNOWN,
112 "TFileTransport: error in file close",
113 errno_copy);
114 } else {
115 // successfully closed fd
116 fd_ = 0;
117 }
118 }
119
120 if (fd) {
121 fd_ = fd;
122 } else {
123 // open file if the input fd is 0
124 openLogFile();
125 }
126}
127
128TFileTransport::~TFileTransport() {
129 // flush the buffer if a writer thread is active
130 if (writerThread_.get()) {
131 // set state to closing
132 closing_ = true;
133
134 // wake up the writer thread
135 // Since closing_ is true, it will attempt to flush all data, then exit.
136 notEmpty_.notify();
137
138 writerThread_->join();
139 writerThread_.reset();
140 }
141
142 if (dequeueBuffer_) {
143 delete dequeueBuffer_;
144 dequeueBuffer_ = nullptr;
145 }
146
147 if (enqueueBuffer_) {
148 delete enqueueBuffer_;
149 enqueueBuffer_ = nullptr;
150 }
151
152 if (readBuff_) {
153 delete[] readBuff_;
154 readBuff_ = nullptr;
155 }
156
157 if (currentEvent_) {
158 delete currentEvent_;
159 currentEvent_ = nullptr;
160 }
161
162 // close logfile
163 if (fd_ > 0) {
164 if (-1 == ::THRIFT_CLOSE(fd_)) {
165 GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
166 } else {
167 // successfully closed fd
168 fd_ = 0;
169 }
170 }
171}
172
173bool TFileTransport::initBufferAndWriteThread() {
174 if (bufferAndThreadInitialized_) {
175 T_ERROR("%s", "Trying to double-init TFileTransport");
176 return false;
177 }
178
179 if (!writerThread_.get()) {
180 writerThread_ = threadFactory_.newThread(
181 apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
182 writerThread_->start();
183 }
184
185 dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
186 enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
187 bufferAndThreadInitialized_ = true;
188
189 return true;
190}
191
192void TFileTransport::write(const uint8_t* buf, uint32_t len) {
193 if (readOnly_) {
194 throw TTransportException("TFileTransport: attempting to write to file opened readonly");
195 }
196
197 enqueueEvent(buf, len);
198}
199
200template <class _T>
201struct uniqueDeleter
202{
203 void operator()(_T *ptr) const { delete ptr; }
204};
205
206void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
207 // can't enqueue more events if file is going to close
208 if (closing_) {
209 return;
210 }
211
212 // make sure that event size is valid
213 if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
214 T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
215 return;
216 }
217
218 if (eventLen == 0) {
219 T_ERROR("%s", "cannot enqueue an empty event");
220 return;
221 }
222
223 std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo());
224 toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4];
225
226 // first 4 bytes is the event length
227 memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
228 // actual event contents
229 memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
230 toEnqueue->eventSize_ = eventLen + 4;
231
232 // lock mutex
233 Guard g(mutex_);
234
235 // make sure that enqueue buffer is initialized and writer thread is running
236 if (!bufferAndThreadInitialized_) {
237 if (!initBufferAndWriteThread()) {
238 return;
239 }
240 }
241
242 // Can't enqueue while buffer is full
243 while (enqueueBuffer_->isFull()) {
244 notFull_.wait();
245 }
246
247 // We shouldn't be trying to enqueue new data while a forced flush is
248 // requested. (Otherwise the writer thread might not ever be able to finish
249 // the flush if more data keeps being enqueued.)
250 assert(!forceFlush_);
251
252 // add to the buffer
253 eventInfo* pEvent = toEnqueue.release();
254 if (!enqueueBuffer_->addEvent(pEvent)) {
255 delete pEvent;
256 return;
257 }
258
259 // signal anybody who's waiting for the buffer to be non-empty
260 notEmpty_.notify();
261
262 // this really should be a loop where it makes sure it got flushed
263 // because condition variables can get triggered by the os for no reason
264 // it is probably a non-factor for the time being
265}
266
267bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
268 bool swap;
269 Guard g(mutex_);
270
271 if (!enqueueBuffer_->isEmpty()) {
272 swap = true;
273 } else if (closing_) {
274 // even though there is no data to write,
275 // return immediately if the transport is closing
276 swap = false;
277 } else {
278 if (deadline != nullptr) {
279 // if we were handed a deadline time struct, do a timed wait
280 notEmpty_.waitForTime(*deadline);
281 } else {
282 // just wait until the buffer gets an item
283 notEmpty_.wait();
284 }
285
286 // could be empty if we timed out
287 swap = enqueueBuffer_->isEmpty();
288 }
289
290 if (swap) {
291 TFileTransportBuffer* temp = enqueueBuffer_;
292 enqueueBuffer_ = dequeueBuffer_;
293 dequeueBuffer_ = temp;
294 }
295
296 if (swap) {
297 notFull_.notify();
298 }
299
300 return swap;
301}
302
303void TFileTransport::writerThread() {
304 bool hasIOError = false;
305
306 // open file if it is not open
307 if (!fd_) {
308 try {
309 openLogFile();
310 } catch (...) {
311 int errno_copy = THRIFT_ERRNO;
312 GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
313 fd_ = 0;
314 hasIOError = true;
315 }
316 }
317
318 // set the offset to the correct value (EOF)
319 if (!hasIOError) {
320 try {
321 seekToEnd();
322 // throw away any partial events
323 offset_ += readState_.lastDispatchPtr_;
324 if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
325 readState_.resetAllValues();
326 } else {
327 int errno_copy = THRIFT_ERRNO;
328 GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
329 hasIOError = true;
330 }
331 } catch (...) {
332 int errno_copy = THRIFT_ERRNO;
333 GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
334 hasIOError = true;
335 }
336 }
337
338 // Figure out the next time by which a flush must take place
339 auto ts_next_flush = getNextFlushTime();
340 uint32_t unflushed = 0;
341
342 while (1) {
343 // this will only be true when the destructor is being invoked
344 if (closing_) {
345 if (hasIOError) {
346 return;
347 }
348
349 // Try to empty buffers before exit
350 if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
351 ::THRIFT_FSYNC(fd_);
352 if (-1 == ::THRIFT_CLOSE(fd_)) {
353 int errno_copy = THRIFT_ERRNO;
354 GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
355 } else {
356 // fd successfully closed
357 fd_ = 0;
358 }
359 return;
360 }
361 }
362
363 if (swapEventBuffers(&ts_next_flush)) {
364 eventInfo* outEvent;
365 while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
366 // Remove an event from the buffer and write it out to disk. If there is any IO error, for
367 // instance,
368 // the output file is unmounted or deleted, then this event is dropped. However, the writer
369 // thread
370 // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
371 // start writing
372 // from the end.
373
374 while (hasIOError) {
375 T_ERROR(
376 "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
377 writerThreadIOErrorSleepTime_);
378 THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
379 if (closing_) {
380 return;
381 }
382 if (!fd_) {
383 ::THRIFT_CLOSE(fd_);
384 fd_ = 0;
385 }
386 try {
387 openLogFile();
388 seekToEnd();
389 unflushed = 0;
390 hasIOError = false;
391 T_LOG_OPER(
392 "TFileTransport: log file %s reopened by writer thread during error recovery",
393 filename_.c_str());
394 } catch (...) {
395 T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
396 filename_.c_str());
397 }
398 }
399
400 // sanity check on event
401 if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
402 T_ERROR("msg size is greater than max event size: %u > %u\n",
403 outEvent->eventSize_,
404 maxEventSize_);
405 continue;
406 }
407
408 // If chunking is required, then make sure that msg does not cross chunk boundary
409 if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
410 // event size must be less than chunk size
411 if (outEvent->eventSize_ > chunkSize_) {
412 T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
413 outEvent->eventSize_,
414 chunkSize_);
415 continue;
416 }
417
418 int64_t chunk1 = offset_ / chunkSize_;
419 int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
420
421 // if adding this event will cross a chunk boundary, pad the chunk with zeros
422 if (chunk1 != chunk2) {
423 // refetch the offset to keep in sync
424 offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
425 auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
426
427 auto* zeros = new uint8_t[padding];
428 memset(zeros, '\0', padding);
429 boost::scoped_array<uint8_t> array(zeros);
430 if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
431 int errno_copy = THRIFT_ERRNO;
432 GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
433 errno_copy);
434 hasIOError = true;
435 continue;
436 }
437 unflushed += padding;
438 offset_ += padding;
439 }
440 }
441
442 // write the dequeued event to the file
443 if (outEvent->eventSize_ > 0) {
444 if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
445 int errno_copy = THRIFT_ERRNO;
446 GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
447 hasIOError = true;
448 continue;
449 }
450 unflushed += outEvent->eventSize_;
451 offset_ += outEvent->eventSize_;
452 }
453 }
454 dequeueBuffer_->reset();
455 }
456
457 if (hasIOError) {
458 continue;
459 }
460
461 // Local variable to cache the state of forceFlush_.
462 //
463 // We only want to check the value of forceFlush_ once each time around the
464 // loop. If we check it more than once without holding the lock the entire
465 // time, it could have changed state in between. This will result in us
466 // making inconsistent decisions.
467 bool forced_flush = false;
468 {
469 Guard g(mutex_);
470 if (forceFlush_) {
471 if (!enqueueBuffer_->isEmpty()) {
472 // If forceFlush_ is true, we need to flush all available data.
473 // If enqueueBuffer_ is not empty, go back to the start of the loop to
474 // write it out.
475 //
476 // We know the main thread is waiting on forceFlush_ to be cleared,
477 // so no new events will be added to enqueueBuffer_ until we clear
478 // forceFlush_. Therefore the next time around the loop enqueueBuffer_
479 // is guaranteed to be empty. (I.e., we're guaranteed to make progress
480 // and clear forceFlush_ the next time around the loop.)
481 continue;
482 }
483 forced_flush = true;
484 }
485 }
486
487 // determine if we need to perform an fsync
488 bool flush = false;
489 if (forced_flush || unflushed > flushMaxBytes_) {
490 flush = true;
491 } else {
492 if (std::chrono::steady_clock::now() > ts_next_flush) {
493 if (unflushed > 0) {
494 flush = true;
495 } else {
496 // If there is no new data since the last fsync,
497 // don't perform the fsync, but do reset the timer.
498 ts_next_flush = getNextFlushTime();
499 }
500 }
501 }
502
503 if (flush) {
504 // sync (force flush) file to disk
505 THRIFT_FSYNC(fd_);
506 unflushed = 0;
507 ts_next_flush = getNextFlushTime();
508
509 // notify anybody waiting for flush completion
510 if (forced_flush) {
511 Guard g(mutex_);
512 forceFlush_ = false;
513 assert(enqueueBuffer_->isEmpty());
514 assert(dequeueBuffer_->isEmpty());
515 flushed_.notifyAll();
516 }
517 }
518 }
519}
520
521void TFileTransport::flush() {
522 // file must be open for writing for any flushing to take place
523 if (!writerThread_.get()) {
524 return;
525 }
526 // wait for flush to take place
527 Guard g(mutex_);
528
529 // Indicate that we are requesting a flush
530 forceFlush_ = true;
531 // Wake up the writer thread so it will perform the flush immediately
532 notEmpty_.notify();
533
534 while (forceFlush_) {
535 flushed_.wait();
536 }
537}
538
539uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
540 uint32_t have = 0;
541 uint32_t get = 0;
542
543 while (have < len) {
544 get = read(buf + have, len - have);
545 if (get <= 0) {
546 throw TEOFException();
547 }
548 have += get;
549 }
550
551 return have;
552}
553
554bool TFileTransport::peek() {
555 // check if there is an event ready to be read
556 if (!currentEvent_) {
557 currentEvent_ = readEvent();
558 }
559
560 // did not manage to read an event from the file. This could have happened
561 // if the timeout expired or there was some other error
562 if (!currentEvent_) {
563 return false;
564 }
565
566 // check if there is anything to read
567 return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
568}
569
570uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
571 // check if there an event is ready to be read
572 if (!currentEvent_) {
573 currentEvent_ = readEvent();
574 }
575
576 // did not manage to read an event from the file. This could have happened
577 // if the timeout expired or there was some other error
578 if (!currentEvent_) {
579 return 0;
580 }
581
582 // read as much of the current event as possible
583 int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
584 if (remaining <= (int32_t)len) {
585 // copy over anything thats remaining
586 if (remaining > 0) {
587 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
588 }
589 delete (currentEvent_);
590 currentEvent_ = nullptr;
591 return remaining;
592 }
593
594 // read as much as possible
595 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
596 currentEvent_->eventBuffPos_ += len;
597 return len;
598}
599
600// note caller is responsible for freeing returned events
601eventInfo* TFileTransport::readEvent() {
602 int readTries = 0;
603
604 if (!readBuff_) {
605 readBuff_ = new uint8_t[readBuffSize_];
606 }
607
608 while (1) {
609 // read from the file if read buffer is exhausted
610 if (readState_.bufferPtr_ == readState_.bufferLen_) {
611 // advance the offset pointer
612 offset_ += readState_.bufferLen_;
613 readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
614 // if (readState_.bufferLen_) {
615 // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
616 // }
617 readState_.bufferPtr_ = 0;
618 readState_.lastDispatchPtr_ = 0;
619
620 // read error
621 if (readState_.bufferLen_ == -1) {
622 readState_.resetAllValues();
623 GlobalOutput("TFileTransport: error while reading from file");
624 throw TTransportException("TFileTransport: error while reading from file");
625 } else if (readState_.bufferLen_ == 0) { // EOF
626 // wait indefinitely if there is no timeout
627 if (readTimeout_ == TAIL_READ_TIMEOUT) {
628 THRIFT_SLEEP_USEC(eofSleepTime_);
629 continue;
630 } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
631 // reset state
632 readState_.resetState(0);
633 return nullptr;
634 } else if (readTimeout_ > 0) {
635 // timeout already expired once
636 if (readTries > 0) {
637 readState_.resetState(0);
638 return nullptr;
639 } else {
640 THRIFT_SLEEP_USEC(readTimeout_ * 1000);
641 readTries++;
642 continue;
643 }
644 }
645 }
646 }
647
648 readTries = 0;
649
650 // attempt to read an event from the buffer
651 while (readState_.bufferPtr_ < readState_.bufferLen_) {
652 if (readState_.readingSize_) {
653 if (readState_.eventSizeBuffPos_ == 0) {
654 if ((offset_ + readState_.bufferPtr_) / chunkSize_
655 != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
656 // skip one byte towards chunk boundary
657 // T_DEBUG_L(1, "Skipping a byte");
658 readState_.bufferPtr_++;
659 continue;
660 }
661 }
662
663 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
664 = readBuff_[readState_.bufferPtr_++];
665
666 if (readState_.eventSizeBuffPos_ == 4) {
667 if (readState_.getEventSize() == 0) {
668 // 0 length event indicates padding
669 // T_DEBUG_L(1, "Got padding");
670 readState_.resetState(readState_.lastDispatchPtr_);
671 continue;
672 }
673 // got a valid event
674 readState_.readingSize_ = false;
675 if (readState_.event_) {
676 delete (readState_.event_);
677 }
678 readState_.event_ = new eventInfo();
679 readState_.event_->eventSize_ = readState_.getEventSize();
680
681 // check if the event is corrupted and perform recovery if required
682 if (isEventCorrupted()) {
683 performRecovery();
684 // start from the top
685 break;
686 }
687 }
688 } else {
689 if (!readState_.event_->eventBuff_) {
690 readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
691 readState_.event_->eventBuffPos_ = 0;
692 }
693 // take either the entire event or the remaining bytes in the buffer
694 int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
695 readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
696
697 // copy data from read buffer into event buffer
698 memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
699 readBuff_ + readState_.bufferPtr_,
700 reclaimBuffer);
701
702 // increment position ptrs
703 readState_.event_->eventBuffPos_ += reclaimBuffer;
704 readState_.bufferPtr_ += reclaimBuffer;
705
706 // check if the event has been read in full
707 if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
708 // set the completed event to the current event
709 eventInfo* completeEvent = readState_.event_;
710 completeEvent->eventBuffPos_ = 0;
711
712 readState_.event_ = nullptr;
713 readState_.resetState(readState_.bufferPtr_);
714
715 // exit criteria
716 return completeEvent;
717 }
718 }
719 }
720 }
721}
722
723bool TFileTransport::isEventCorrupted() {
724 // an error is triggered if:
725 if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
726 // 1. Event size is larger than user-speficied max-event size
727 T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
728 readState_.event_->eventSize_,
729 maxEventSize_);
730 return true;
731 } else if (readState_.event_->eventSize_ > chunkSize_) {
732 // 2. Event size is larger than chunk size
733 T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
734 readState_.event_->eventSize_,
735 chunkSize_);
736 return true;
737 } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
738 != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
739 / chunkSize_)) {
740 // 3. size indicates that event crosses chunk boundary
741 T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu",
742 readState_.event_->eventSize_,
743 static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
744
745 return true;
746 }
747
748 return false;
749}
750
751void TFileTransport::performRecovery() {
752 // perform some kickass recovery
753 uint32_t curChunk = getCurChunk();
754 if (lastBadChunk_ == curChunk) {
755 numCorruptedEventsInChunk_++;
756 } else {
757 lastBadChunk_ = curChunk;
758 numCorruptedEventsInChunk_ = 1;
759 }
760
761 if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
762 // maybe there was an error in reading the file from disk
763 // seek to the beginning of chunk and try again
764 seekToChunk(curChunk);
765 } else {
766
767 // just skip ahead to the next chunk if we not already at the last chunk
768 if (curChunk != (getNumChunks() - 1)) {
769 seekToChunk(curChunk + 1);
770 } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
771 // if tailing the file, wait until there is enough data to start
772 // the next chunk
773 while (curChunk == (getNumChunks() - 1)) {
774 THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
775 }
776 seekToChunk(curChunk + 1);
777 } else {
778 // pretty hosed at this stage, rewind the file back to the last successful
779 // point and punt on the error
780 readState_.resetState(readState_.lastDispatchPtr_);
781 currentEvent_ = nullptr;
782 char errorMsg[1024];
783 sprintf(errorMsg,
784 "TFileTransport: log file corrupted at offset: %lu",
785 static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
786
787 GlobalOutput(errorMsg);
788 throw TTransportException(errorMsg);
789 }
790 }
791}
792
793void TFileTransport::seekToChunk(int32_t chunk) {
794 if (fd_ <= 0) {
795 throw TTransportException("File not open");
796 }
797
798 int32_t numChunks = getNumChunks();
799
800 // file is empty, seeking to chunk is pointless
801 if (numChunks == 0) {
802 return;
803 }
804
805 // negative indicates reverse seek (from the end)
806 if (chunk < 0) {
807 chunk += numChunks;
808 }
809
810 // too large a value for reverse seek, just seek to beginning
811 if (chunk < 0) {
812 T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
813 chunk = 0;
814 }
815
816 // cannot seek past EOF
817 bool seekToEnd = false;
818 off_t minEndOffset = 0;
819 if (chunk >= numChunks) {
820 T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
821 seekToEnd = true;
822 chunk = numChunks - 1;
823 // this is the min offset to process events till
824 minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
825 }
826
827 off_t newOffset = off_t(chunk) * chunkSize_;
828 offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
829 readState_.resetAllValues();
830 currentEvent_ = nullptr;
831 if (offset_ == -1) {
832 GlobalOutput("TFileTransport: lseek error in seekToChunk");
833 throw TTransportException("TFileTransport: lseek error in seekToChunk");
834 }
835
836 // seek to EOF if user wanted to go to last chunk
837 if (seekToEnd) {
838 uint32_t oldReadTimeout = getReadTimeout();
839 setReadTimeout(NO_TAIL_READ_TIMEOUT);
840 // keep on reading unti the last event at point of seekChunk call
841 shared_ptr<eventInfo> event;
842 while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
843 event.reset(readEvent());
844 if (event.get() == nullptr) {
845 break;
846 }
847 }
848 setReadTimeout(oldReadTimeout);
849 }
850}
851
852void TFileTransport::seekToEnd() {
853 seekToChunk(getNumChunks());
854}
855
856uint32_t TFileTransport::getNumChunks() {
857 if (fd_ <= 0) {
858 return 0;
859 }
860
861 struct THRIFT_STAT f_info;
862 int rv = ::THRIFT_FSTAT(fd_, &f_info);
863
864 if (rv < 0) {
865 int errno_copy = THRIFT_ERRNO;
866 throw TTransportException(TTransportException::UNKNOWN,
867 "TFileTransport::getNumChunks() (fstat)",
868 errno_copy);
869 }
870
871 if (f_info.st_size > 0) {
872 size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
873 if (numChunks > (std::numeric_limits<uint32_t>::max)())
874 throw TTransportException("Too many chunks");
875 return static_cast<uint32_t>(numChunks);
876 }
877
878 // empty file has no chunks
879 return 0;
880}
881
882uint32_t TFileTransport::getCurChunk() {
883 return static_cast<uint32_t>(offset_ / chunkSize_);
884}
885
886// Utility Functions
887void TFileTransport::openLogFile() {
888#ifndef _WIN32
889 mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
890 int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
891#else
892 int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
893 int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
894#endif
895 fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
896 offset_ = 0;
897
898 // make sure open call was successful
899 if (fd_ == -1) {
900 int errno_copy = THRIFT_ERRNO;
901 GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
902 throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
903 }
904}
905
906std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
907 return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
908}
909
910TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
911 : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
912 buffer_ = new eventInfo* [size];
913}
914
915TFileTransportBuffer::~TFileTransportBuffer() {
916 if (buffer_) {
917 for (uint32_t i = 0; i < writePoint_; i++) {
918 delete buffer_[i];
919 }
920 delete[] buffer_;
921 buffer_ = nullptr;
922 }
923}
924
925bool TFileTransportBuffer::addEvent(eventInfo* event) {
926 if (bufferMode_ == READ) {
927 GlobalOutput("Trying to write to a buffer in read mode");
928 }
929 if (writePoint_ < size_) {
930 buffer_[writePoint_++] = event;
931 return true;
932 } else {
933 // buffer is full
934 return false;
935 }
936}
937
938eventInfo* TFileTransportBuffer::getNext() {
939 if (bufferMode_ == WRITE) {
940 bufferMode_ = READ;
941 }
942 if (readPoint_ < writePoint_) {
943 return buffer_[readPoint_++];
944 } else {
945 // no more entries
946 return nullptr;
947 }
948}
949
950void TFileTransportBuffer::reset() {
951 if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
952 T_DEBUG("%s", "Resetting a buffer with unread entries");
953 }
954 // Clean up the old entries
955 for (uint32_t i = 0; i < writePoint_; i++) {
956 delete buffer_[i];
957 }
958 bufferMode_ = WRITE;
959 writePoint_ = 0;
960 readPoint_ = 0;
961}
962
963bool TFileTransportBuffer::isFull() {
964 return writePoint_ == size_;
965}
966
967bool TFileTransportBuffer::isEmpty() {
968 return writePoint_ == 0;
969}
970
971TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
972 shared_ptr<TProtocolFactory> protocolFactory,
973 shared_ptr<TFileReaderTransport> inputTransport)
974 : processor_(processor),
975 inputProtocolFactory_(protocolFactory),
976 outputProtocolFactory_(protocolFactory),
977 inputTransport_(inputTransport) {
978
979 // default the output transport to a null transport (common case)
980 outputTransport_ = std::make_shared<TNullTransport>();
981}
982
983TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
984 shared_ptr<TProtocolFactory> inputProtocolFactory,
985 shared_ptr<TProtocolFactory> outputProtocolFactory,
986 shared_ptr<TFileReaderTransport> inputTransport)
987 : processor_(processor),
988 inputProtocolFactory_(inputProtocolFactory),
989 outputProtocolFactory_(outputProtocolFactory),
990 inputTransport_(inputTransport) {
991
992 // default the output transport to a null transport (common case)
993 outputTransport_ = std::make_shared<TNullTransport>();
994}
995
996TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
997 shared_ptr<TProtocolFactory> protocolFactory,
998 shared_ptr<TFileReaderTransport> inputTransport,
999 shared_ptr<TTransport> outputTransport)
1000 : processor_(processor),
1001 inputProtocolFactory_(protocolFactory),
1002 outputProtocolFactory_(protocolFactory),
1003 inputTransport_(inputTransport),
1004 outputTransport_(outputTransport) {
1005}
1006
1007void TFileProcessor::process(uint32_t numEvents, bool tail) {
1008 shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1009 shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1010
1011 // set the read timeout to 0 if tailing is required
1012 int32_t oldReadTimeout = inputTransport_->getReadTimeout();
1013 if (tail) {
1014 // save old read timeout so it can be restored
1015 inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
1016 }
1017
1018 uint32_t numProcessed = 0;
1019 while (1) {
1020 // bad form to use exceptions for flow control but there is really
1021 // no other way around it
1022 try {
1023 processor_->process(inputProtocol, outputProtocol, nullptr);
1024 numProcessed++;
1025 if ((numEvents > 0) && (numProcessed == numEvents)) {
1026 return;
1027 }
1028 } catch (TEOFException&) {
1029 if (!tail) {
1030 break;
1031 }
1032 } catch (TException& te) {
1033 cerr << te.what() << endl;
1034 break;
1035 }
1036 }
1037
1038 // restore old read timeout
1039 if (tail) {
1040 inputTransport_->setReadTimeout(oldReadTimeout);
1041 }
1042}
1043
1044void TFileProcessor::processChunk() {
1045 shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1046 shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1047
1048 uint32_t curChunk = inputTransport_->getCurChunk();
1049
1050 while (1) {
1051 // bad form to use exceptions for flow control but there is really
1052 // no other way around it
1053 try {
1054 processor_->process(inputProtocol, outputProtocol, nullptr);
1055 if (curChunk != inputTransport_->getCurChunk()) {
1056 break;
1057 }
1058 } catch (TEOFException&) {
1059 break;
1060 } catch (TException& te) {
1061 cerr << te.what() << endl;
1062 break;
1063 }
1064 }
1065}
1066}
1067}
1068} // apache::thrift::transport