]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TFileTransport.cpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TFileTransport.cpp
diff --git a/ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TFileTransport.cpp b/ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TFileTransport.cpp
deleted file mode 100644 (file)
index eaf2bc3..0000000
+++ /dev/null
@@ -1,1068 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/transport/TFileTransport.h>
-#include <thrift/transport/TTransportUtils.h>
-#include <thrift/transport/PlatformSocket.h>
-#include <thrift/concurrency/FunctionRunner.h>
-
-#include <boost/version.hpp>
-
-#ifdef HAVE_SYS_TIME_H
-#include <sys/time.h>
-#else
-#include <time.h>
-#endif
-#include <fcntl.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#include <cstdlib>
-#include <cstring>
-#include <iostream>
-#include <limits>
-#include <memory>
-#ifdef HAVE_SYS_STAT_H
-#include <sys/stat.h>
-#endif
-
-#ifdef _WIN32
-#include <io.h>
-#endif
-
-namespace apache {
-namespace thrift {
-namespace transport {
-
-using std::shared_ptr;
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::string;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::concurrency;
-
-TFileTransport::TFileTransport(string path, bool readOnly)
-  : readState_(),
-    readBuff_(nullptr),
-    currentEvent_(nullptr),
-    readBuffSize_(DEFAULT_READ_BUFF_SIZE),
-    readTimeout_(NO_TAIL_READ_TIMEOUT),
-    chunkSize_(DEFAULT_CHUNK_SIZE),
-    eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
-    flushMaxUs_(DEFAULT_FLUSH_MAX_US),
-    flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
-    maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
-    maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
-    eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
-    corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
-    writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
-    dequeueBuffer_(nullptr),
-    enqueueBuffer_(nullptr),
-    notFull_(&mutex_),
-    notEmpty_(&mutex_),
-    closing_(false),
-    flushed_(&mutex_),
-    forceFlush_(false),
-    filename_(path),
-    fd_(0),
-    bufferAndThreadInitialized_(false),
-    offset_(0),
-    lastBadChunk_(0),
-    numCorruptedEventsInChunk_(0),
-    readOnly_(readOnly) {
-  threadFactory_.setDetached(false);
-  openLogFile();
-}
-
-void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
-  filename_ = filename;
-  offset_ = offset;
-
-  // check if current file is still open
-  if (fd_ > 0) {
-    // flush any events in the queue
-    flush();
-    GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
-    if (-1 == ::THRIFT_CLOSE(fd_)) {
-      int errno_copy = THRIFT_ERRNO;
-      GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
-      throw TTransportException(TTransportException::UNKNOWN,
-                                "TFileTransport: error in file close",
-                                errno_copy);
-    } else {
-      // successfully closed fd
-      fd_ = 0;
-    }
-  }
-
-  if (fd) {
-    fd_ = fd;
-  } else {
-    // open file if the input fd is 0
-    openLogFile();
-  }
-}
-
-TFileTransport::~TFileTransport() {
-  // flush the buffer if a writer thread is active
-  if (writerThread_.get()) {
-    // set state to closing
-    closing_ = true;
-
-    // wake up the writer thread
-    // Since closing_ is true, it will attempt to flush all data, then exit.
-    notEmpty_.notify();
-
-    writerThread_->join();
-    writerThread_.reset();
-  }
-
-  if (dequeueBuffer_) {
-    delete dequeueBuffer_;
-    dequeueBuffer_ = nullptr;
-  }
-
-  if (enqueueBuffer_) {
-    delete enqueueBuffer_;
-    enqueueBuffer_ = nullptr;
-  }
-
-  if (readBuff_) {
-    delete[] readBuff_;
-    readBuff_ = nullptr;
-  }
-
-  if (currentEvent_) {
-    delete currentEvent_;
-    currentEvent_ = nullptr;
-  }
-
-  // close logfile
-  if (fd_ > 0) {
-    if (-1 == ::THRIFT_CLOSE(fd_)) {
-      GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
-    } else {
-      // successfully closed fd
-      fd_ = 0;
-    }
-  }
-}
-
-bool TFileTransport::initBufferAndWriteThread() {
-  if (bufferAndThreadInitialized_) {
-    T_ERROR("%s", "Trying to double-init TFileTransport");
-    return false;
-  }
-
-  if (!writerThread_.get()) {
-    writerThread_ = threadFactory_.newThread(
-        apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
-    writerThread_->start();
-  }
-
-  dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
-  enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
-  bufferAndThreadInitialized_ = true;
-
-  return true;
-}
-
-void TFileTransport::write(const uint8_t* buf, uint32_t len) {
-  if (readOnly_) {
-    throw TTransportException("TFileTransport: attempting to write to file opened readonly");
-  }
-
-  enqueueEvent(buf, len);
-}
-
-template <class _T>
-struct uniqueDeleter
-{
-  void operator()(_T *ptr) const { delete ptr; }
-};
-
-void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
-  // can't enqueue more events if file is going to close
-  if (closing_) {
-    return;
-  }
-
-  // make sure that event size is valid
-  if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
-    T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
-    return;
-  }
-
-  if (eventLen == 0) {
-    T_ERROR("%s", "cannot enqueue an empty event");
-    return;
-  }
-
-  std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo());
-  toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4];
-
-  // first 4 bytes is the event length
-  memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
-  // actual event contents
-  memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
-  toEnqueue->eventSize_ = eventLen + 4;
-
-  // lock mutex
-  Guard g(mutex_);
-
-  // make sure that enqueue buffer is initialized and writer thread is running
-  if (!bufferAndThreadInitialized_) {
-    if (!initBufferAndWriteThread()) {
-      return;
-    }
-  }
-
-  // Can't enqueue while buffer is full
-  while (enqueueBuffer_->isFull()) {
-    notFull_.wait();
-  }
-
-  // We shouldn't be trying to enqueue new data while a forced flush is
-  // requested.  (Otherwise the writer thread might not ever be able to finish
-  // the flush if more data keeps being enqueued.)
-  assert(!forceFlush_);
-
-  // add to the buffer
-  eventInfo* pEvent = toEnqueue.release();
-  if (!enqueueBuffer_->addEvent(pEvent)) {
-    delete pEvent;
-    return;
-  }
-
-  // signal anybody who's waiting for the buffer to be non-empty
-  notEmpty_.notify();
-
-  // this really should be a loop where it makes sure it got flushed
-  // because condition variables can get triggered by the os for no reason
-  // it is probably a non-factor for the time being
-}
-
-bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
-  bool swap;
-  Guard g(mutex_);
-
-  if (!enqueueBuffer_->isEmpty()) {
-    swap = true;
-  } else if (closing_) {
-    // even though there is no data to write,
-    // return immediately if the transport is closing
-    swap = false;
-  } else {
-    if (deadline != nullptr) {
-      // if we were handed a deadline time struct, do a timed wait
-      notEmpty_.waitForTime(*deadline);
-    } else {
-      // just wait until the buffer gets an item
-      notEmpty_.wait();
-    }
-
-    // could be empty if we timed out
-    swap = enqueueBuffer_->isEmpty();
-  }
-
-  if (swap) {
-    TFileTransportBuffer* temp = enqueueBuffer_;
-    enqueueBuffer_ = dequeueBuffer_;
-    dequeueBuffer_ = temp;
-  }
-
-  if (swap) {
-    notFull_.notify();
-  }
-
-  return swap;
-}
-
-void TFileTransport::writerThread() {
-  bool hasIOError = false;
-
-  // open file if it is not open
-  if (!fd_) {
-    try {
-      openLogFile();
-    } catch (...) {
-      int errno_copy = THRIFT_ERRNO;
-      GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
-      fd_ = 0;
-      hasIOError = true;
-    }
-  }
-
-  // set the offset to the correct value (EOF)
-  if (!hasIOError) {
-    try {
-      seekToEnd();
-      // throw away any partial events
-      offset_ += readState_.lastDispatchPtr_;
-      if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
-        readState_.resetAllValues();
-      } else {
-        int errno_copy = THRIFT_ERRNO;
-        GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
-        hasIOError = true;
-      }
-    } catch (...) {
-      int errno_copy = THRIFT_ERRNO;
-      GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
-      hasIOError = true;
-    }
-  }
-
-  // Figure out the next time by which a flush must take place
-  auto ts_next_flush = getNextFlushTime();
-  uint32_t unflushed = 0;
-
-  while (1) {
-    // this will only be true when the destructor is being invoked
-    if (closing_) {
-      if (hasIOError) {
-        return;
-      }
-
-      // Try to empty buffers before exit
-      if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
-        ::THRIFT_FSYNC(fd_);
-        if (-1 == ::THRIFT_CLOSE(fd_)) {
-          int errno_copy = THRIFT_ERRNO;
-          GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
-        } else {
-          // fd successfully closed
-          fd_ = 0;
-        }
-        return;
-      }
-    }
-
-    if (swapEventBuffers(&ts_next_flush)) {
-      eventInfo* outEvent;
-      while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
-        // Remove an event from the buffer and write it out to disk. If there is any IO error, for
-        // instance,
-        // the output file is unmounted or deleted, then this event is dropped. However, the writer
-        // thread
-        // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
-        // start writing
-        // from the end.
-
-        while (hasIOError) {
-          T_ERROR(
-              "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
-              writerThreadIOErrorSleepTime_);
-          THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
-          if (closing_) {
-            return;
-          }
-          if (!fd_) {
-            ::THRIFT_CLOSE(fd_);
-            fd_ = 0;
-          }
-          try {
-            openLogFile();
-            seekToEnd();
-            unflushed = 0;
-            hasIOError = false;
-            T_LOG_OPER(
-                "TFileTransport: log file %s reopened by writer thread during error recovery",
-                filename_.c_str());
-          } catch (...) {
-            T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
-                    filename_.c_str());
-          }
-        }
-
-        // sanity check on event
-        if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
-          T_ERROR("msg size is greater than max event size: %u > %u\n",
-                  outEvent->eventSize_,
-                  maxEventSize_);
-          continue;
-        }
-
-        // If chunking is required, then make sure that msg does not cross chunk boundary
-        if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
-          // event size must be less than chunk size
-          if (outEvent->eventSize_ > chunkSize_) {
-            T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
-                    outEvent->eventSize_,
-                    chunkSize_);
-            continue;
-          }
-
-          int64_t chunk1 = offset_ / chunkSize_;
-          int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
-
-          // if adding this event will cross a chunk boundary, pad the chunk with zeros
-          if (chunk1 != chunk2) {
-            // refetch the offset to keep in sync
-            offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
-            auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
-
-            auto* zeros = new uint8_t[padding];
-            memset(zeros, '\0', padding);
-            boost::scoped_array<uint8_t> array(zeros);
-            if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
-              int errno_copy = THRIFT_ERRNO;
-              GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
-                                  errno_copy);
-              hasIOError = true;
-              continue;
-            }
-            unflushed += padding;
-            offset_ += padding;
-          }
-        }
-
-        // write the dequeued event to the file
-        if (outEvent->eventSize_ > 0) {
-          if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
-            int errno_copy = THRIFT_ERRNO;
-            GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
-            hasIOError = true;
-            continue;
-          }
-          unflushed += outEvent->eventSize_;
-          offset_ += outEvent->eventSize_;
-        }
-      }
-      dequeueBuffer_->reset();
-    }
-
-    if (hasIOError) {
-      continue;
-    }
-
-    // Local variable to cache the state of forceFlush_.
-    //
-    // We only want to check the value of forceFlush_ once each time around the
-    // loop.  If we check it more than once without holding the lock the entire
-    // time, it could have changed state in between.  This will result in us
-    // making inconsistent decisions.
-    bool forced_flush = false;
-    {
-      Guard g(mutex_);
-      if (forceFlush_) {
-        if (!enqueueBuffer_->isEmpty()) {
-          // If forceFlush_ is true, we need to flush all available data.
-          // If enqueueBuffer_ is not empty, go back to the start of the loop to
-          // write it out.
-          //
-          // We know the main thread is waiting on forceFlush_ to be cleared,
-          // so no new events will be added to enqueueBuffer_ until we clear
-          // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
-          // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
-          // and clear forceFlush_ the next time around the loop.)
-          continue;
-        }
-        forced_flush = true;
-      }
-    }
-
-    // determine if we need to perform an fsync
-    bool flush = false;
-    if (forced_flush || unflushed > flushMaxBytes_) {
-      flush = true;
-    } else {
-      if (std::chrono::steady_clock::now() > ts_next_flush) {
-        if (unflushed > 0) {
-          flush = true;
-        } else {
-          // If there is no new data since the last fsync,
-          // don't perform the fsync, but do reset the timer.
-          ts_next_flush = getNextFlushTime();
-        }
-      }
-    }
-
-    if (flush) {
-      // sync (force flush) file to disk
-      THRIFT_FSYNC(fd_);
-      unflushed = 0;
-      ts_next_flush = getNextFlushTime();
-
-      // notify anybody waiting for flush completion
-      if (forced_flush) {
-        Guard g(mutex_);
-        forceFlush_ = false;
-        assert(enqueueBuffer_->isEmpty());
-        assert(dequeueBuffer_->isEmpty());
-        flushed_.notifyAll();
-      }
-    }
-  }
-}
-
-void TFileTransport::flush() {
-  // file must be open for writing for any flushing to take place
-  if (!writerThread_.get()) {
-    return;
-  }
-  // wait for flush to take place
-  Guard g(mutex_);
-
-  // Indicate that we are requesting a flush
-  forceFlush_ = true;
-  // Wake up the writer thread so it will perform the flush immediately
-  notEmpty_.notify();
-
-  while (forceFlush_) {
-    flushed_.wait();
-  }
-}
-
-uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
-  uint32_t have = 0;
-  uint32_t get = 0;
-
-  while (have < len) {
-    get = read(buf + have, len - have);
-    if (get <= 0) {
-      throw TEOFException();
-    }
-    have += get;
-  }
-
-  return have;
-}
-
-bool TFileTransport::peek() {
-  // check if there is an event ready to be read
-  if (!currentEvent_) {
-    currentEvent_ = readEvent();
-  }
-
-  // did not manage to read an event from the file. This could have happened
-  // if the timeout expired or there was some other error
-  if (!currentEvent_) {
-    return false;
-  }
-
-  // check if there is anything to read
-  return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
-}
-
-uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
-  // check if there an event is ready to be read
-  if (!currentEvent_) {
-    currentEvent_ = readEvent();
-  }
-
-  // did not manage to read an event from the file. This could have happened
-  // if the timeout expired or there was some other error
-  if (!currentEvent_) {
-    return 0;
-  }
-
-  // read as much of the current event as possible
-  int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
-  if (remaining <= (int32_t)len) {
-    // copy over anything thats remaining
-    if (remaining > 0) {
-      memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
-    }
-    delete (currentEvent_);
-    currentEvent_ = nullptr;
-    return remaining;
-  }
-
-  // read as much as possible
-  memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
-  currentEvent_->eventBuffPos_ += len;
-  return len;
-}
-
-// note caller is responsible for freeing returned events
-eventInfo* TFileTransport::readEvent() {
-  int readTries = 0;
-
-  if (!readBuff_) {
-    readBuff_ = new uint8_t[readBuffSize_];
-  }
-
-  while (1) {
-    // read from the file if read buffer is exhausted
-    if (readState_.bufferPtr_ == readState_.bufferLen_) {
-      // advance the offset pointer
-      offset_ += readState_.bufferLen_;
-      readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
-      //       if (readState_.bufferLen_) {
-      //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
-      //       }
-      readState_.bufferPtr_ = 0;
-      readState_.lastDispatchPtr_ = 0;
-
-      // read error
-      if (readState_.bufferLen_ == -1) {
-        readState_.resetAllValues();
-        GlobalOutput("TFileTransport: error while reading from file");
-        throw TTransportException("TFileTransport: error while reading from file");
-      } else if (readState_.bufferLen_ == 0) { // EOF
-        // wait indefinitely if there is no timeout
-        if (readTimeout_ == TAIL_READ_TIMEOUT) {
-          THRIFT_SLEEP_USEC(eofSleepTime_);
-          continue;
-        } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
-          // reset state
-          readState_.resetState(0);
-          return nullptr;
-        } else if (readTimeout_ > 0) {
-          // timeout already expired once
-          if (readTries > 0) {
-            readState_.resetState(0);
-            return nullptr;
-          } else {
-            THRIFT_SLEEP_USEC(readTimeout_ * 1000);
-            readTries++;
-            continue;
-          }
-        }
-      }
-    }
-
-    readTries = 0;
-
-    // attempt to read an event from the buffer
-    while (readState_.bufferPtr_ < readState_.bufferLen_) {
-      if (readState_.readingSize_) {
-        if (readState_.eventSizeBuffPos_ == 0) {
-          if ((offset_ + readState_.bufferPtr_) / chunkSize_
-              != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
-            // skip one byte towards chunk boundary
-            //            T_DEBUG_L(1, "Skipping a byte");
-            readState_.bufferPtr_++;
-            continue;
-          }
-        }
-
-        readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
-            = readBuff_[readState_.bufferPtr_++];
-
-        if (readState_.eventSizeBuffPos_ == 4) {
-          if (readState_.getEventSize() == 0) {
-            // 0 length event indicates padding
-            //            T_DEBUG_L(1, "Got padding");
-            readState_.resetState(readState_.lastDispatchPtr_);
-            continue;
-          }
-          // got a valid event
-          readState_.readingSize_ = false;
-          if (readState_.event_) {
-            delete (readState_.event_);
-          }
-          readState_.event_ = new eventInfo();
-          readState_.event_->eventSize_ = readState_.getEventSize();
-
-          // check if the event is corrupted and perform recovery if required
-          if (isEventCorrupted()) {
-            performRecovery();
-            // start from the top
-            break;
-          }
-        }
-      } else {
-        if (!readState_.event_->eventBuff_) {
-          readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
-          readState_.event_->eventBuffPos_ = 0;
-        }
-        // take either the entire event or the remaining bytes in the buffer
-        int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
-                                       readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
-
-        // copy data from read buffer into event buffer
-        memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
-               readBuff_ + readState_.bufferPtr_,
-               reclaimBuffer);
-
-        // increment position ptrs
-        readState_.event_->eventBuffPos_ += reclaimBuffer;
-        readState_.bufferPtr_ += reclaimBuffer;
-
-        // check if the event has been read in full
-        if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
-          // set the completed event to the current event
-          eventInfo* completeEvent = readState_.event_;
-          completeEvent->eventBuffPos_ = 0;
-
-          readState_.event_ = nullptr;
-          readState_.resetState(readState_.bufferPtr_);
-
-          // exit criteria
-          return completeEvent;
-        }
-      }
-    }
-  }
-}
-
-bool TFileTransport::isEventCorrupted() {
-  // an error is triggered if:
-  if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
-    // 1. Event size is larger than user-speficied max-event size
-    T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
-            readState_.event_->eventSize_,
-            maxEventSize_);
-    return true;
-  } else if (readState_.event_->eventSize_ > chunkSize_) {
-    // 2. Event size is larger than chunk size
-    T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
-            readState_.event_->eventSize_,
-            chunkSize_);
-    return true;
-  } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
-             != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
-                 / chunkSize_)) {
-    // 3. size indicates that event crosses chunk boundary
-    T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%lu",
-            readState_.event_->eventSize_,
-            static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
-
-    return true;
-  }
-
-  return false;
-}
-
-void TFileTransport::performRecovery() {
-  // perform some kickass recovery
-  uint32_t curChunk = getCurChunk();
-  if (lastBadChunk_ == curChunk) {
-    numCorruptedEventsInChunk_++;
-  } else {
-    lastBadChunk_ = curChunk;
-    numCorruptedEventsInChunk_ = 1;
-  }
-
-  if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
-    // maybe there was an error in reading the file from disk
-    // seek to the beginning of chunk and try again
-    seekToChunk(curChunk);
-  } else {
-
-    // just skip ahead to the next chunk if we not already at the last chunk
-    if (curChunk != (getNumChunks() - 1)) {
-      seekToChunk(curChunk + 1);
-    } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
-      // if tailing the file, wait until there is enough data to start
-      // the next chunk
-      while (curChunk == (getNumChunks() - 1)) {
-        THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
-      }
-      seekToChunk(curChunk + 1);
-    } else {
-      // pretty hosed at this stage, rewind the file back to the last successful
-      // point and punt on the error
-      readState_.resetState(readState_.lastDispatchPtr_);
-      currentEvent_ = nullptr;
-      char errorMsg[1024];
-      sprintf(errorMsg,
-              "TFileTransport: log file corrupted at offset: %lu",
-              static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
-
-      GlobalOutput(errorMsg);
-      throw TTransportException(errorMsg);
-    }
-  }
-}
-
-void TFileTransport::seekToChunk(int32_t chunk) {
-  if (fd_ <= 0) {
-    throw TTransportException("File not open");
-  }
-
-  int32_t numChunks = getNumChunks();
-
-  // file is empty, seeking to chunk is pointless
-  if (numChunks == 0) {
-    return;
-  }
-
-  // negative indicates reverse seek (from the end)
-  if (chunk < 0) {
-    chunk += numChunks;
-  }
-
-  // too large a value for reverse seek, just seek to beginning
-  if (chunk < 0) {
-    T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
-    chunk = 0;
-  }
-
-  // cannot seek past EOF
-  bool seekToEnd = false;
-  off_t minEndOffset = 0;
-  if (chunk >= numChunks) {
-    T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
-    seekToEnd = true;
-    chunk = numChunks - 1;
-    // this is the min offset to process events till
-    minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
-  }
-
-  off_t newOffset = off_t(chunk) * chunkSize_;
-  offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
-  readState_.resetAllValues();
-  currentEvent_ = nullptr;
-  if (offset_ == -1) {
-    GlobalOutput("TFileTransport: lseek error in seekToChunk");
-    throw TTransportException("TFileTransport: lseek error in seekToChunk");
-  }
-
-  // seek to EOF if user wanted to go to last chunk
-  if (seekToEnd) {
-    uint32_t oldReadTimeout = getReadTimeout();
-    setReadTimeout(NO_TAIL_READ_TIMEOUT);
-    // keep on reading unti the last event at point of seekChunk call
-    shared_ptr<eventInfo> event;
-    while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
-      event.reset(readEvent());
-      if (event.get() == nullptr) {
-        break;
-      }
-    }
-    setReadTimeout(oldReadTimeout);
-  }
-}
-
-void TFileTransport::seekToEnd() {
-  seekToChunk(getNumChunks());
-}
-
-uint32_t TFileTransport::getNumChunks() {
-  if (fd_ <= 0) {
-    return 0;
-  }
-
-  struct THRIFT_STAT f_info;
-  int rv = ::THRIFT_FSTAT(fd_, &f_info);
-
-  if (rv < 0) {
-    int errno_copy = THRIFT_ERRNO;
-    throw TTransportException(TTransportException::UNKNOWN,
-                              "TFileTransport::getNumChunks() (fstat)",
-                              errno_copy);
-  }
-
-  if (f_info.st_size > 0) {
-    size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
-    if (numChunks > (std::numeric_limits<uint32_t>::max)())
-      throw TTransportException("Too many chunks");
-    return static_cast<uint32_t>(numChunks);
-  }
-
-  // empty file has no chunks
-  return 0;
-}
-
-uint32_t TFileTransport::getCurChunk() {
-  return static_cast<uint32_t>(offset_ / chunkSize_);
-}
-
-// Utility Functions
-void TFileTransport::openLogFile() {
-#ifndef _WIN32
-  mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
-  int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
-#else
-  int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
-  int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
-#endif
-  fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
-  offset_ = 0;
-
-  // make sure open call was successful
-  if (fd_ == -1) {
-    int errno_copy = THRIFT_ERRNO;
-    GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
-    throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
-  }
-}
-
-std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
-  return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
-}
-
-TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
-  : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
-  buffer_ = new eventInfo* [size];
-}
-
-TFileTransportBuffer::~TFileTransportBuffer() {
-  if (buffer_) {
-    for (uint32_t i = 0; i < writePoint_; i++) {
-      delete buffer_[i];
-    }
-    delete[] buffer_;
-    buffer_ = nullptr;
-  }
-}
-
-bool TFileTransportBuffer::addEvent(eventInfo* event) {
-  if (bufferMode_ == READ) {
-    GlobalOutput("Trying to write to a buffer in read mode");
-  }
-  if (writePoint_ < size_) {
-    buffer_[writePoint_++] = event;
-    return true;
-  } else {
-    // buffer is full
-    return false;
-  }
-}
-
-eventInfo* TFileTransportBuffer::getNext() {
-  if (bufferMode_ == WRITE) {
-    bufferMode_ = READ;
-  }
-  if (readPoint_ < writePoint_) {
-    return buffer_[readPoint_++];
-  } else {
-    // no more entries
-    return nullptr;
-  }
-}
-
-void TFileTransportBuffer::reset() {
-  if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
-    T_DEBUG("%s", "Resetting a buffer with unread entries");
-  }
-  // Clean up the old entries
-  for (uint32_t i = 0; i < writePoint_; i++) {
-    delete buffer_[i];
-  }
-  bufferMode_ = WRITE;
-  writePoint_ = 0;
-  readPoint_ = 0;
-}
-
-bool TFileTransportBuffer::isFull() {
-  return writePoint_ == size_;
-}
-
-bool TFileTransportBuffer::isEmpty() {
-  return writePoint_ == 0;
-}
-
-TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
-                               shared_ptr<TProtocolFactory> protocolFactory,
-                               shared_ptr<TFileReaderTransport> inputTransport)
-  : processor_(processor),
-    inputProtocolFactory_(protocolFactory),
-    outputProtocolFactory_(protocolFactory),
-    inputTransport_(inputTransport) {
-
-  // default the output transport to a null transport (common case)
-  outputTransport_ = std::make_shared<TNullTransport>();
-}
-
-TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
-                               shared_ptr<TProtocolFactory> inputProtocolFactory,
-                               shared_ptr<TProtocolFactory> outputProtocolFactory,
-                               shared_ptr<TFileReaderTransport> inputTransport)
-  : processor_(processor),
-    inputProtocolFactory_(inputProtocolFactory),
-    outputProtocolFactory_(outputProtocolFactory),
-    inputTransport_(inputTransport) {
-
-  // default the output transport to a null transport (common case)
-  outputTransport_ = std::make_shared<TNullTransport>();
-}
-
-TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
-                               shared_ptr<TProtocolFactory> protocolFactory,
-                               shared_ptr<TFileReaderTransport> inputTransport,
-                               shared_ptr<TTransport> outputTransport)
-  : processor_(processor),
-    inputProtocolFactory_(protocolFactory),
-    outputProtocolFactory_(protocolFactory),
-    inputTransport_(inputTransport),
-    outputTransport_(outputTransport) {
-}
-
-void TFileProcessor::process(uint32_t numEvents, bool tail) {
-  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
-  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
-
-  // set the read timeout to 0 if tailing is required
-  int32_t oldReadTimeout = inputTransport_->getReadTimeout();
-  if (tail) {
-    // save old read timeout so it can be restored
-    inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
-  }
-
-  uint32_t numProcessed = 0;
-  while (1) {
-    // bad form to use exceptions for flow control but there is really
-    // no other way around it
-    try {
-      processor_->process(inputProtocol, outputProtocol, nullptr);
-      numProcessed++;
-      if ((numEvents > 0) && (numProcessed == numEvents)) {
-        return;
-      }
-    } catch (TEOFException&) {
-      if (!tail) {
-        break;
-      }
-    } catch (TException& te) {
-      cerr << te.what() << endl;
-      break;
-    }
-  }
-
-  // restore old read timeout
-  if (tail) {
-    inputTransport_->setReadTimeout(oldReadTimeout);
-  }
-}
-
-void TFileProcessor::processChunk() {
-  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
-  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
-
-  uint32_t curChunk = inputTransport_->getCurChunk();
-
-  while (1) {
-    // bad form to use exceptions for flow control but there is really
-    // no other way around it
-    try {
-      processor_->process(inputProtocol, outputProtocol, nullptr);
-      if (curChunk != inputTransport_->getCurChunk()) {
-        break;
-      }
-    } catch (TEOFException&) {
-      break;
-    } catch (TException& te) {
-      cerr << te.what() << endl;
-      break;
-    }
-  }
-}
-}
-}
-} // apache::thrift::transport