2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
21 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
23 #include <thrift/transport/TTransport.h>
24 #include <thrift/Thrift.h>
25 #include <thrift/TProcessor.h>
31 #include <thrift/concurrency/Mutex.h>
32 #include <thrift/concurrency/Monitor.h>
33 #include <thrift/concurrency/ThreadFactory.h>
34 #include <thrift/concurrency/Thread.h>
40 using apache::thrift::TProcessor
;
41 using apache::thrift::protocol::TProtocolFactory
;
42 using apache::thrift::concurrency::Mutex
;
43 using apache::thrift::concurrency::Monitor
;
45 // Data pertaining to a single event
46 typedef struct eventInfo
{
49 uint32_t eventBuffPos_
;
51 eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){};
59 // information about current read state
60 typedef struct readState
{
63 // keep track of event size
64 uint8_t eventSizeBuff_
[4];
65 uint8_t eventSizeBuffPos_
;
68 // read buffer variables
72 // last successful dispatch point
73 int32_t lastDispatchPtr_
;
75 void resetState(uint32_t lastDispatchPtr
) {
77 eventSizeBuffPos_
= 0;
78 lastDispatchPtr_
= lastDispatchPtr
;
81 void resetAllValues() {
91 inline uint32_t getEventSize() {
92 const void* buffer
= reinterpret_cast<const void*>(eventSizeBuff_
);
93 return *reinterpret_cast<const uint32_t*>(buffer
);
110 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
111 * to be written to disk. Should be used in the following way:
113 * 2) Buffer written to (addEvent)
114 * 3) Buffer read from (getNext)
115 * 4) Buffer reset (reset)
116 * 5) Go back to 2, or destroy buffer
118 * The buffer should never be written to after it is read from, unless it is reset first.
119 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
120 * which uses the buffer in this way.
123 class TFileTransportBuffer
{
125 TFileTransportBuffer(uint32_t size
);
126 ~TFileTransportBuffer();
128 bool addEvent(eventInfo
* event
);
129 eventInfo
* getNext();
135 TFileTransportBuffer(); // should not be used
137 enum mode
{ WRITE
, READ
};
140 uint32_t writePoint_
;
147 * Abstract interface for transports used to read files
149 class TFileReaderTransport
: virtual public TTransport
{
151 virtual int32_t getReadTimeout() = 0;
152 virtual void setReadTimeout(int32_t readTimeout
) = 0;
154 virtual uint32_t getNumChunks() = 0;
155 virtual uint32_t getCurChunk() = 0;
156 virtual void seekToChunk(int32_t chunk
) = 0;
157 virtual void seekToEnd() = 0;
161 * Abstract interface for transports used to write files
163 class TFileWriterTransport
: virtual public TTransport
{
165 virtual uint32_t getChunkSize() = 0;
166 virtual void setChunkSize(uint32_t chunkSize
) = 0;
170 * File implementation of a transport. Reads and writes are done to a
174 class TFileTransport
: public TFileReaderTransport
, public TFileWriterTransport
{
176 TFileTransport(std::string path
, bool readOnly
= false);
177 ~TFileTransport() override
;
179 // TODO: what is the correct behaviour for this?
180 // the log file is generally always open
181 bool isOpen() const override
{ return true; }
183 void write(const uint8_t* buf
, uint32_t len
);
184 void flush() override
;
186 uint32_t readAll(uint8_t* buf
, uint32_t len
);
187 uint32_t read(uint8_t* buf
, uint32_t len
);
188 bool peek() override
;
190 // log-file specific functions
191 void seekToChunk(int32_t chunk
) override
;
192 void seekToEnd() override
;
193 uint32_t getNumChunks() override
;
194 uint32_t getCurChunk() override
;
196 // for changing the output file
197 void resetOutputFile(int fd
, std::string filename
, off_t offset
);
199 // Setter/Getter functions for user-controllable options
200 void setReadBuffSize(uint32_t readBuffSize
) {
202 readBuffSize_
= readBuffSize
;
205 uint32_t getReadBuffSize() { return readBuffSize_
; }
207 static const int32_t TAIL_READ_TIMEOUT
= -1;
208 static const int32_t NO_TAIL_READ_TIMEOUT
= 0;
209 void setReadTimeout(int32_t readTimeout
) override
{ readTimeout_
= readTimeout
; }
210 int32_t getReadTimeout() override
{ return readTimeout_
; }
212 void setChunkSize(uint32_t chunkSize
) override
{
214 chunkSize_
= chunkSize
;
217 uint32_t getChunkSize() override
{ return chunkSize_
; }
219 void setEventBufferSize(uint32_t bufferSize
) {
220 if (bufferAndThreadInitialized_
) {
221 GlobalOutput("Cannot change the buffer size after writer thread started");
224 eventBufferSize_
= bufferSize
;
227 uint32_t getEventBufferSize() { return eventBufferSize_
; }
229 void setFlushMaxUs(uint32_t flushMaxUs
) {
231 flushMaxUs_
= flushMaxUs
;
234 uint32_t getFlushMaxUs() { return flushMaxUs_
; }
236 void setFlushMaxBytes(uint32_t flushMaxBytes
) {
238 flushMaxBytes_
= flushMaxBytes
;
241 uint32_t getFlushMaxBytes() { return flushMaxBytes_
; }
243 void setMaxEventSize(uint32_t maxEventSize
) { maxEventSize_
= maxEventSize
; }
244 uint32_t getMaxEventSize() { return maxEventSize_
; }
246 void setMaxCorruptedEvents(uint32_t maxCorruptedEvents
) {
247 maxCorruptedEvents_
= maxCorruptedEvents
;
249 uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_
; }
251 void setEofSleepTimeUs(uint32_t eofSleepTime
) {
253 eofSleepTime_
= eofSleepTime
;
256 uint32_t getEofSleepTimeUs() { return eofSleepTime_
; }
259 * Override TTransport *_virt() functions to invoke our implementations.
260 * We cannot use TVirtualTransport to provide these, since we need to inherit
261 * virtually from TTransport.
263 uint32_t read_virt(uint8_t* buf
, uint32_t len
) override
{ return this->read(buf
, len
); }
264 uint32_t readAll_virt(uint8_t* buf
, uint32_t len
) override
{ return this->readAll(buf
, len
); }
265 void write_virt(const uint8_t* buf
, uint32_t len
) override
{ this->write(buf
, len
); }
268 // helper functions for writing to a file
269 void enqueueEvent(const uint8_t* buf
, uint32_t eventLen
);
270 bool swapEventBuffers(const std::chrono::time_point
<std::chrono::steady_clock
> *deadline
);
271 bool initBufferAndWriteThread();
273 // control for writer thread
274 static void* startWriterThread(void* ptr
) {
275 static_cast<TFileTransport
*>(ptr
)->writerThread();
280 // helper functions for reading from a file
281 eventInfo
* readEvent();
283 // event corruption-related functions
284 bool isEventCorrupted();
285 void performRecovery();
289 std::chrono::time_point
<std::chrono::steady_clock
> getNextFlushTime();
292 readState readState_
;
294 eventInfo
* currentEvent_
;
296 uint32_t readBuffSize_
;
297 static const uint32_t DEFAULT_READ_BUFF_SIZE
= 1 * 1024 * 1024;
299 int32_t readTimeout_
;
300 static const int32_t DEFAULT_READ_TIMEOUT_MS
= 200;
302 // size of chunks that file will be split up into
304 static const uint32_t DEFAULT_CHUNK_SIZE
= 16 * 1024 * 1024;
306 // size of event buffers
307 uint32_t eventBufferSize_
;
308 static const uint32_t DEFAULT_EVENT_BUFFER_SIZE
= 10000;
310 // max number of microseconds that can pass without flushing
311 uint32_t flushMaxUs_
;
312 static const uint32_t DEFAULT_FLUSH_MAX_US
= 3000000;
314 // max number of bytes that can be written without flushing
315 uint32_t flushMaxBytes_
;
316 static const uint32_t DEFAULT_FLUSH_MAX_BYTES
= 1000 * 1024;
319 uint32_t maxEventSize_
;
320 static const uint32_t DEFAULT_MAX_EVENT_SIZE
= 0;
322 // max number of corrupted events per chunk
323 uint32_t maxCorruptedEvents_
;
324 static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS
= 0;
326 // sleep duration when EOF is hit
327 uint32_t eofSleepTime_
;
328 static const uint32_t DEFAULT_EOF_SLEEP_TIME_US
= 500 * 1000;
330 // sleep duration when a corrupted event is encountered
331 uint32_t corruptedEventSleepTime_
;
332 static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US
= 1 * 1000 * 1000;
334 // sleep duration in seconds when an IO error is encountered in the writer thread
335 uint32_t writerThreadIOErrorSleepTime_
;
336 static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US
= 60 * 1000 * 1000;
339 apache::thrift::concurrency::ThreadFactory threadFactory_
;
340 std::shared_ptr
<apache::thrift::concurrency::Thread
> writerThread_
;
342 // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
343 // needs to be written to the file. The buffers are swapped by the writer thread.
344 TFileTransportBuffer
* dequeueBuffer_
;
345 TFileTransportBuffer
* enqueueBuffer_
;
347 // conditions used to block when the buffer is full or empty
348 Monitor notFull_
, notEmpty_
;
349 std::atomic
<bool> closing_
;
351 // To keep track of whether the buffer has been flushed
353 std::atomic
<bool> forceFlush_
;
355 // Mutex that is grabbed when enqueueing and swapping the read/write buffers
359 std::string filename_
;
362 // Whether the writer thread and buffers have been initialized
363 bool bufferAndThreadInitialized_
;
365 // Offset within the file
368 // event corruption information
369 uint32_t lastBadChunk_
;
370 uint32_t numCorruptedEventsInChunk_
;
375 // Exception thrown when EOF is hit
376 class TEOFException
: public TTransportException
{
378 TEOFException() : TTransportException(TTransportException::END_OF_FILE
){};
381 // wrapper class to process events from a file containing thrift events
382 class TFileProcessor
{
385 * Constructor that defaults output transport to null transport
387 * @param processor processes log-file events
388 * @param protocolFactory protocol factory
389 * @param inputTransport file transport
391 TFileProcessor(std::shared_ptr
<TProcessor
> processor
,
392 std::shared_ptr
<TProtocolFactory
> protocolFactory
,
393 std::shared_ptr
<TFileReaderTransport
> inputTransport
);
395 TFileProcessor(std::shared_ptr
<TProcessor
> processor
,
396 std::shared_ptr
<TProtocolFactory
> inputProtocolFactory
,
397 std::shared_ptr
<TProtocolFactory
> outputProtocolFactory
,
398 std::shared_ptr
<TFileReaderTransport
> inputTransport
);
403 * @param processor processes log-file events
404 * @param protocolFactory protocol factory
405 * @param inputTransport input file transport
406 * @param output output transport
408 TFileProcessor(std::shared_ptr
<TProcessor
> processor
,
409 std::shared_ptr
<TProtocolFactory
> protocolFactory
,
410 std::shared_ptr
<TFileReaderTransport
> inputTransport
,
411 std::shared_ptr
<TTransport
> outputTransport
);
414 * processes events from the file
416 * @param numEvents number of events to process (0 for unlimited)
417 * @param tail tails the file if true
419 void process(uint32_t numEvents
, bool tail
);
422 * process events until the end of the chunk
428 std::shared_ptr
<TProcessor
> processor_
;
429 std::shared_ptr
<TProtocolFactory
> inputProtocolFactory_
;
430 std::shared_ptr
<TProtocolFactory
> outputProtocolFactory_
;
431 std::shared_ptr
<TFileReaderTransport
> inputTransport_
;
432 std::shared_ptr
<TTransport
> outputTransport_
;
436 } // apache::thrift::transport
438 #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_