]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_ | |
21 | #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1 | |
22 | ||
23 | #include <thrift/transport/TTransport.h> | |
24 | #include <thrift/Thrift.h> | |
25 | #include <thrift/TProcessor.h> | |
26 | ||
27 | #include <atomic> | |
28 | #include <string> | |
29 | #include <stdio.h> | |
30 | ||
31 | #include <thrift/concurrency/Mutex.h> | |
32 | #include <thrift/concurrency/Monitor.h> | |
33 | #include <thrift/concurrency/ThreadFactory.h> | |
34 | #include <thrift/concurrency/Thread.h> | |
35 | ||
36 | namespace apache { | |
37 | namespace thrift { | |
38 | namespace transport { | |
39 | ||
40 | using apache::thrift::TProcessor; | |
41 | using apache::thrift::protocol::TProtocolFactory; | |
42 | using apache::thrift::concurrency::Mutex; | |
43 | using apache::thrift::concurrency::Monitor; | |
44 | ||
45 | // Data pertaining to a single event | |
46 | typedef struct eventInfo { | |
47 | uint8_t* eventBuff_; | |
48 | uint32_t eventSize_; | |
49 | uint32_t eventBuffPos_; | |
50 | ||
51 | eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){}; | |
52 | ~eventInfo() { | |
53 | if (eventBuff_) { | |
54 | delete[] eventBuff_; | |
55 | } | |
56 | } | |
57 | } eventInfo; | |
58 | ||
59 | // information about current read state | |
60 | typedef struct readState { | |
61 | eventInfo* event_; | |
62 | ||
63 | // keep track of event size | |
64 | uint8_t eventSizeBuff_[4]; | |
65 | uint8_t eventSizeBuffPos_; | |
66 | bool readingSize_; | |
67 | ||
68 | // read buffer variables | |
69 | int32_t bufferPtr_; | |
70 | int32_t bufferLen_; | |
71 | ||
72 | // last successful dispatch point | |
73 | int32_t lastDispatchPtr_; | |
74 | ||
75 | void resetState(uint32_t lastDispatchPtr) { | |
76 | readingSize_ = true; | |
77 | eventSizeBuffPos_ = 0; | |
78 | lastDispatchPtr_ = lastDispatchPtr; | |
79 | } | |
80 | ||
81 | void resetAllValues() { | |
82 | resetState(0); | |
83 | bufferPtr_ = 0; | |
84 | bufferLen_ = 0; | |
85 | if (event_) { | |
86 | delete (event_); | |
87 | } | |
88 | event_ = nullptr; | |
89 | } | |
90 | ||
91 | inline uint32_t getEventSize() { | |
92 | const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_); | |
93 | return *reinterpret_cast<const uint32_t*>(buffer); | |
94 | } | |
95 | ||
96 | readState() { | |
97 | event_ = nullptr; | |
98 | resetAllValues(); | |
99 | } | |
100 | ||
101 | ~readState() { | |
102 | if (event_) { | |
103 | delete (event_); | |
104 | } | |
105 | } | |
106 | ||
107 | } readState; | |
108 | ||
109 | /** | |
110 | * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events | |
111 | * to be written to disk. Should be used in the following way: | |
112 | * 1) Buffer created | |
113 | * 2) Buffer written to (addEvent) | |
114 | * 3) Buffer read from (getNext) | |
115 | * 4) Buffer reset (reset) | |
116 | * 5) Go back to 2, or destroy buffer | |
117 | * | |
118 | * The buffer should never be written to after it is read from, unless it is reset first. | |
119 | * Note: The above rules are enforced mainly for debugging its sole client TFileTransport | |
120 | * which uses the buffer in this way. | |
121 | * | |
122 | */ | |
123 | class TFileTransportBuffer { | |
124 | public: | |
125 | TFileTransportBuffer(uint32_t size); | |
126 | ~TFileTransportBuffer(); | |
127 | ||
128 | bool addEvent(eventInfo* event); | |
129 | eventInfo* getNext(); | |
130 | void reset(); | |
131 | bool isFull(); | |
132 | bool isEmpty(); | |
133 | ||
134 | private: | |
135 | TFileTransportBuffer(); // should not be used | |
136 | ||
137 | enum mode { WRITE, READ }; | |
138 | mode bufferMode_; | |
139 | ||
140 | uint32_t writePoint_; | |
141 | uint32_t readPoint_; | |
142 | uint32_t size_; | |
143 | eventInfo** buffer_; | |
144 | }; | |
145 | ||
146 | /** | |
147 | * Abstract interface for transports used to read files | |
148 | */ | |
149 | class TFileReaderTransport : virtual public TTransport { | |
150 | public: | |
151 | virtual int32_t getReadTimeout() = 0; | |
152 | virtual void setReadTimeout(int32_t readTimeout) = 0; | |
153 | ||
154 | virtual uint32_t getNumChunks() = 0; | |
155 | virtual uint32_t getCurChunk() = 0; | |
156 | virtual void seekToChunk(int32_t chunk) = 0; | |
157 | virtual void seekToEnd() = 0; | |
158 | }; | |
159 | ||
160 | /** | |
161 | * Abstract interface for transports used to write files | |
162 | */ | |
163 | class TFileWriterTransport : virtual public TTransport { | |
164 | public: | |
165 | virtual uint32_t getChunkSize() = 0; | |
166 | virtual void setChunkSize(uint32_t chunkSize) = 0; | |
167 | }; | |
168 | ||
169 | /** | |
170 | * File implementation of a transport. Reads and writes are done to a | |
171 | * file on disk. | |
172 | * | |
173 | */ | |
174 | class TFileTransport : public TFileReaderTransport, public TFileWriterTransport { | |
175 | public: | |
176 | TFileTransport(std::string path, bool readOnly = false); | |
177 | ~TFileTransport() override; | |
178 | ||
179 | // TODO: what is the correct behaviour for this? | |
180 | // the log file is generally always open | |
181 | bool isOpen() const override { return true; } | |
182 | ||
183 | void write(const uint8_t* buf, uint32_t len); | |
184 | void flush() override; | |
185 | ||
186 | uint32_t readAll(uint8_t* buf, uint32_t len); | |
187 | uint32_t read(uint8_t* buf, uint32_t len); | |
188 | bool peek() override; | |
189 | ||
190 | // log-file specific functions | |
191 | void seekToChunk(int32_t chunk) override; | |
192 | void seekToEnd() override; | |
193 | uint32_t getNumChunks() override; | |
194 | uint32_t getCurChunk() override; | |
195 | ||
196 | // for changing the output file | |
197 | void resetOutputFile(int fd, std::string filename, off_t offset); | |
198 | ||
199 | // Setter/Getter functions for user-controllable options | |
200 | void setReadBuffSize(uint32_t readBuffSize) { | |
201 | if (readBuffSize) { | |
202 | readBuffSize_ = readBuffSize; | |
203 | } | |
204 | } | |
205 | uint32_t getReadBuffSize() { return readBuffSize_; } | |
206 | ||
207 | static const int32_t TAIL_READ_TIMEOUT = -1; | |
208 | static const int32_t NO_TAIL_READ_TIMEOUT = 0; | |
209 | void setReadTimeout(int32_t readTimeout) override { readTimeout_ = readTimeout; } | |
210 | int32_t getReadTimeout() override { return readTimeout_; } | |
211 | ||
212 | void setChunkSize(uint32_t chunkSize) override { | |
213 | if (chunkSize) { | |
214 | chunkSize_ = chunkSize; | |
215 | } | |
216 | } | |
217 | uint32_t getChunkSize() override { return chunkSize_; } | |
218 | ||
219 | void setEventBufferSize(uint32_t bufferSize) { | |
220 | if (bufferAndThreadInitialized_) { | |
221 | GlobalOutput("Cannot change the buffer size after writer thread started"); | |
222 | return; | |
223 | } | |
224 | eventBufferSize_ = bufferSize; | |
225 | } | |
226 | ||
227 | uint32_t getEventBufferSize() { return eventBufferSize_; } | |
228 | ||
229 | void setFlushMaxUs(uint32_t flushMaxUs) { | |
230 | if (flushMaxUs) { | |
231 | flushMaxUs_ = flushMaxUs; | |
232 | } | |
233 | } | |
234 | uint32_t getFlushMaxUs() { return flushMaxUs_; } | |
235 | ||
236 | void setFlushMaxBytes(uint32_t flushMaxBytes) { | |
237 | if (flushMaxBytes) { | |
238 | flushMaxBytes_ = flushMaxBytes; | |
239 | } | |
240 | } | |
241 | uint32_t getFlushMaxBytes() { return flushMaxBytes_; } | |
242 | ||
243 | void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; } | |
244 | uint32_t getMaxEventSize() { return maxEventSize_; } | |
245 | ||
246 | void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) { | |
247 | maxCorruptedEvents_ = maxCorruptedEvents; | |
248 | } | |
249 | uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; } | |
250 | ||
251 | void setEofSleepTimeUs(uint32_t eofSleepTime) { | |
252 | if (eofSleepTime) { | |
253 | eofSleepTime_ = eofSleepTime; | |
254 | } | |
255 | } | |
256 | uint32_t getEofSleepTimeUs() { return eofSleepTime_; } | |
257 | ||
258 | /* | |
259 | * Override TTransport *_virt() functions to invoke our implementations. | |
260 | * We cannot use TVirtualTransport to provide these, since we need to inherit | |
261 | * virtually from TTransport. | |
262 | */ | |
263 | uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); } | |
264 | uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); } | |
265 | void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); } | |
266 | ||
267 | private: | |
268 | // helper functions for writing to a file | |
269 | void enqueueEvent(const uint8_t* buf, uint32_t eventLen); | |
270 | bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline); | |
271 | bool initBufferAndWriteThread(); | |
272 | ||
273 | // control for writer thread | |
274 | static void* startWriterThread(void* ptr) { | |
275 | static_cast<TFileTransport*>(ptr)->writerThread(); | |
276 | return nullptr; | |
277 | } | |
278 | void writerThread(); | |
279 | ||
280 | // helper functions for reading from a file | |
281 | eventInfo* readEvent(); | |
282 | ||
283 | // event corruption-related functions | |
284 | bool isEventCorrupted(); | |
285 | void performRecovery(); | |
286 | ||
287 | // Utility functions | |
288 | void openLogFile(); | |
289 | std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime(); | |
290 | ||
291 | // Class variables | |
292 | readState readState_; | |
293 | uint8_t* readBuff_; | |
294 | eventInfo* currentEvent_; | |
295 | ||
296 | uint32_t readBuffSize_; | |
297 | static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024; | |
298 | ||
299 | int32_t readTimeout_; | |
300 | static const int32_t DEFAULT_READ_TIMEOUT_MS = 200; | |
301 | ||
302 | // size of chunks that file will be split up into | |
303 | uint32_t chunkSize_; | |
304 | static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; | |
305 | ||
306 | // size of event buffers | |
307 | uint32_t eventBufferSize_; | |
308 | static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000; | |
309 | ||
310 | // max number of microseconds that can pass without flushing | |
311 | uint32_t flushMaxUs_; | |
312 | static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000; | |
313 | ||
314 | // max number of bytes that can be written without flushing | |
315 | uint32_t flushMaxBytes_; | |
316 | static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024; | |
317 | ||
318 | // max event size | |
319 | uint32_t maxEventSize_; | |
320 | static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0; | |
321 | ||
322 | // max number of corrupted events per chunk | |
323 | uint32_t maxCorruptedEvents_; | |
324 | static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0; | |
325 | ||
326 | // sleep duration when EOF is hit | |
327 | uint32_t eofSleepTime_; | |
328 | static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; | |
329 | ||
330 | // sleep duration when a corrupted event is encountered | |
331 | uint32_t corruptedEventSleepTime_; | |
332 | static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000; | |
333 | ||
334 | // sleep duration in seconds when an IO error is encountered in the writer thread | |
335 | uint32_t writerThreadIOErrorSleepTime_; | |
336 | static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; | |
337 | ||
338 | // writer thread | |
339 | apache::thrift::concurrency::ThreadFactory threadFactory_; | |
340 | std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_; | |
341 | ||
342 | // buffers to hold data before it is flushed. Each element of the buffer stores a msg that | |
343 | // needs to be written to the file. The buffers are swapped by the writer thread. | |
344 | TFileTransportBuffer* dequeueBuffer_; | |
345 | TFileTransportBuffer* enqueueBuffer_; | |
346 | ||
347 | // conditions used to block when the buffer is full or empty | |
348 | Monitor notFull_, notEmpty_; | |
349 | std::atomic<bool> closing_; | |
350 | ||
351 | // To keep track of whether the buffer has been flushed | |
352 | Monitor flushed_; | |
353 | std::atomic<bool> forceFlush_; | |
354 | ||
355 | // Mutex that is grabbed when enqueueing and swapping the read/write buffers | |
356 | Mutex mutex_; | |
357 | ||
358 | // File information | |
359 | std::string filename_; | |
360 | int fd_; | |
361 | ||
362 | // Whether the writer thread and buffers have been initialized | |
363 | bool bufferAndThreadInitialized_; | |
364 | ||
365 | // Offset within the file | |
366 | off_t offset_; | |
367 | ||
368 | // event corruption information | |
369 | uint32_t lastBadChunk_; | |
370 | uint32_t numCorruptedEventsInChunk_; | |
371 | ||
372 | bool readOnly_; | |
373 | }; | |
374 | ||
375 | // Exception thrown when EOF is hit | |
376 | class TEOFException : public TTransportException { | |
377 | public: | |
378 | TEOFException() : TTransportException(TTransportException::END_OF_FILE){}; | |
379 | }; | |
380 | ||
381 | // wrapper class to process events from a file containing thrift events | |
382 | class TFileProcessor { | |
383 | public: | |
384 | /** | |
385 | * Constructor that defaults output transport to null transport | |
386 | * | |
387 | * @param processor processes log-file events | |
388 | * @param protocolFactory protocol factory | |
389 | * @param inputTransport file transport | |
390 | */ | |
391 | TFileProcessor(std::shared_ptr<TProcessor> processor, | |
392 | std::shared_ptr<TProtocolFactory> protocolFactory, | |
393 | std::shared_ptr<TFileReaderTransport> inputTransport); | |
394 | ||
395 | TFileProcessor(std::shared_ptr<TProcessor> processor, | |
396 | std::shared_ptr<TProtocolFactory> inputProtocolFactory, | |
397 | std::shared_ptr<TProtocolFactory> outputProtocolFactory, | |
398 | std::shared_ptr<TFileReaderTransport> inputTransport); | |
399 | ||
400 | /** | |
401 | * Constructor | |
402 | * | |
403 | * @param processor processes log-file events | |
404 | * @param protocolFactory protocol factory | |
405 | * @param inputTransport input file transport | |
406 | * @param output output transport | |
407 | */ | |
408 | TFileProcessor(std::shared_ptr<TProcessor> processor, | |
409 | std::shared_ptr<TProtocolFactory> protocolFactory, | |
410 | std::shared_ptr<TFileReaderTransport> inputTransport, | |
411 | std::shared_ptr<TTransport> outputTransport); | |
412 | ||
413 | /** | |
414 | * processes events from the file | |
415 | * | |
416 | * @param numEvents number of events to process (0 for unlimited) | |
417 | * @param tail tails the file if true | |
418 | */ | |
419 | void process(uint32_t numEvents, bool tail); | |
420 | ||
421 | /** | |
422 | * process events until the end of the chunk | |
423 | * | |
424 | */ | |
425 | void processChunk(); | |
426 | ||
427 | private: | |
428 | std::shared_ptr<TProcessor> processor_; | |
429 | std::shared_ptr<TProtocolFactory> inputProtocolFactory_; | |
430 | std::shared_ptr<TProtocolFactory> outputProtocolFactory_; | |
431 | std::shared_ptr<TFileReaderTransport> inputTransport_; | |
432 | std::shared_ptr<TTransport> outputTransport_; | |
433 | }; | |
434 | } | |
435 | } | |
436 | } // apache::thrift::transport | |
437 | ||
438 | #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_ | |
439 |