]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TFileTransport.h
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TFileTransport.h
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
21#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
22
23#include <thrift/transport/TTransport.h>
24#include <thrift/Thrift.h>
25#include <thrift/TProcessor.h>
26
27#include <atomic>
28#include <string>
29#include <stdio.h>
30
31#include <thrift/concurrency/Mutex.h>
32#include <thrift/concurrency/Monitor.h>
33#include <thrift/concurrency/ThreadFactory.h>
34#include <thrift/concurrency/Thread.h>
35
36namespace apache {
37namespace thrift {
38namespace transport {
39
40using apache::thrift::TProcessor;
41using apache::thrift::protocol::TProtocolFactory;
42using apache::thrift::concurrency::Mutex;
43using apache::thrift::concurrency::Monitor;
44
45// Data pertaining to a single event
46typedef struct eventInfo {
47 uint8_t* eventBuff_;
48 uint32_t eventSize_;
49 uint32_t eventBuffPos_;
50
51 eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){};
52 ~eventInfo() {
53 if (eventBuff_) {
54 delete[] eventBuff_;
55 }
56 }
57} eventInfo;
58
59// information about current read state
60typedef struct readState {
61 eventInfo* event_;
62
63 // keep track of event size
64 uint8_t eventSizeBuff_[4];
65 uint8_t eventSizeBuffPos_;
66 bool readingSize_;
67
68 // read buffer variables
69 int32_t bufferPtr_;
70 int32_t bufferLen_;
71
72 // last successful dispatch point
73 int32_t lastDispatchPtr_;
74
75 void resetState(uint32_t lastDispatchPtr) {
76 readingSize_ = true;
77 eventSizeBuffPos_ = 0;
78 lastDispatchPtr_ = lastDispatchPtr;
79 }
80
81 void resetAllValues() {
82 resetState(0);
83 bufferPtr_ = 0;
84 bufferLen_ = 0;
85 if (event_) {
86 delete (event_);
87 }
88 event_ = nullptr;
89 }
90
91 inline uint32_t getEventSize() {
92 const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
93 return *reinterpret_cast<const uint32_t*>(buffer);
94 }
95
96 readState() {
97 event_ = nullptr;
98 resetAllValues();
99 }
100
101 ~readState() {
102 if (event_) {
103 delete (event_);
104 }
105 }
106
107} readState;
108
109/**
110 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
111 * to be written to disk. Should be used in the following way:
112 * 1) Buffer created
113 * 2) Buffer written to (addEvent)
114 * 3) Buffer read from (getNext)
115 * 4) Buffer reset (reset)
116 * 5) Go back to 2, or destroy buffer
117 *
118 * The buffer should never be written to after it is read from, unless it is reset first.
119 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
120 * which uses the buffer in this way.
121 *
122 */
123class TFileTransportBuffer {
124public:
125 TFileTransportBuffer(uint32_t size);
126 ~TFileTransportBuffer();
127
128 bool addEvent(eventInfo* event);
129 eventInfo* getNext();
130 void reset();
131 bool isFull();
132 bool isEmpty();
133
134private:
135 TFileTransportBuffer(); // should not be used
136
137 enum mode { WRITE, READ };
138 mode bufferMode_;
139
140 uint32_t writePoint_;
141 uint32_t readPoint_;
142 uint32_t size_;
143 eventInfo** buffer_;
144};
145
146/**
147 * Abstract interface for transports used to read files
148 */
149class TFileReaderTransport : virtual public TTransport {
150public:
151 virtual int32_t getReadTimeout() = 0;
152 virtual void setReadTimeout(int32_t readTimeout) = 0;
153
154 virtual uint32_t getNumChunks() = 0;
155 virtual uint32_t getCurChunk() = 0;
156 virtual void seekToChunk(int32_t chunk) = 0;
157 virtual void seekToEnd() = 0;
158};
159
160/**
161 * Abstract interface for transports used to write files
162 */
163class TFileWriterTransport : virtual public TTransport {
164public:
165 virtual uint32_t getChunkSize() = 0;
166 virtual void setChunkSize(uint32_t chunkSize) = 0;
167};
168
169/**
170 * File implementation of a transport. Reads and writes are done to a
171 * file on disk.
172 *
173 */
174class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
175public:
176 TFileTransport(std::string path, bool readOnly = false);
177 ~TFileTransport() override;
178
179 // TODO: what is the correct behaviour for this?
180 // the log file is generally always open
181 bool isOpen() const override { return true; }
182
183 void write(const uint8_t* buf, uint32_t len);
184 void flush() override;
185
186 uint32_t readAll(uint8_t* buf, uint32_t len);
187 uint32_t read(uint8_t* buf, uint32_t len);
188 bool peek() override;
189
190 // log-file specific functions
191 void seekToChunk(int32_t chunk) override;
192 void seekToEnd() override;
193 uint32_t getNumChunks() override;
194 uint32_t getCurChunk() override;
195
196 // for changing the output file
197 void resetOutputFile(int fd, std::string filename, off_t offset);
198
199 // Setter/Getter functions for user-controllable options
200 void setReadBuffSize(uint32_t readBuffSize) {
201 if (readBuffSize) {
202 readBuffSize_ = readBuffSize;
203 }
204 }
205 uint32_t getReadBuffSize() { return readBuffSize_; }
206
207 static const int32_t TAIL_READ_TIMEOUT = -1;
208 static const int32_t NO_TAIL_READ_TIMEOUT = 0;
209 void setReadTimeout(int32_t readTimeout) override { readTimeout_ = readTimeout; }
210 int32_t getReadTimeout() override { return readTimeout_; }
211
212 void setChunkSize(uint32_t chunkSize) override {
213 if (chunkSize) {
214 chunkSize_ = chunkSize;
215 }
216 }
217 uint32_t getChunkSize() override { return chunkSize_; }
218
219 void setEventBufferSize(uint32_t bufferSize) {
220 if (bufferAndThreadInitialized_) {
221 GlobalOutput("Cannot change the buffer size after writer thread started");
222 return;
223 }
224 eventBufferSize_ = bufferSize;
225 }
226
227 uint32_t getEventBufferSize() { return eventBufferSize_; }
228
229 void setFlushMaxUs(uint32_t flushMaxUs) {
230 if (flushMaxUs) {
231 flushMaxUs_ = flushMaxUs;
232 }
233 }
234 uint32_t getFlushMaxUs() { return flushMaxUs_; }
235
236 void setFlushMaxBytes(uint32_t flushMaxBytes) {
237 if (flushMaxBytes) {
238 flushMaxBytes_ = flushMaxBytes;
239 }
240 }
241 uint32_t getFlushMaxBytes() { return flushMaxBytes_; }
242
243 void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
244 uint32_t getMaxEventSize() { return maxEventSize_; }
245
246 void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
247 maxCorruptedEvents_ = maxCorruptedEvents;
248 }
249 uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }
250
251 void setEofSleepTimeUs(uint32_t eofSleepTime) {
252 if (eofSleepTime) {
253 eofSleepTime_ = eofSleepTime;
254 }
255 }
256 uint32_t getEofSleepTimeUs() { return eofSleepTime_; }
257
258 /*
259 * Override TTransport *_virt() functions to invoke our implementations.
260 * We cannot use TVirtualTransport to provide these, since we need to inherit
261 * virtually from TTransport.
262 */
263 uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
264 uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); }
265 void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
266
267private:
268 // helper functions for writing to a file
269 void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
270 bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline);
271 bool initBufferAndWriteThread();
272
273 // control for writer thread
274 static void* startWriterThread(void* ptr) {
275 static_cast<TFileTransport*>(ptr)->writerThread();
276 return nullptr;
277 }
278 void writerThread();
279
280 // helper functions for reading from a file
281 eventInfo* readEvent();
282
283 // event corruption-related functions
284 bool isEventCorrupted();
285 void performRecovery();
286
287 // Utility functions
288 void openLogFile();
289 std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime();
290
291 // Class variables
292 readState readState_;
293 uint8_t* readBuff_;
294 eventInfo* currentEvent_;
295
296 uint32_t readBuffSize_;
297 static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
298
299 int32_t readTimeout_;
300 static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
301
302 // size of chunks that file will be split up into
303 uint32_t chunkSize_;
304 static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
305
306 // size of event buffers
307 uint32_t eventBufferSize_;
308 static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
309
310 // max number of microseconds that can pass without flushing
311 uint32_t flushMaxUs_;
312 static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
313
314 // max number of bytes that can be written without flushing
315 uint32_t flushMaxBytes_;
316 static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
317
318 // max event size
319 uint32_t maxEventSize_;
320 static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
321
322 // max number of corrupted events per chunk
323 uint32_t maxCorruptedEvents_;
324 static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
325
326 // sleep duration when EOF is hit
327 uint32_t eofSleepTime_;
328 static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
329
330 // sleep duration when a corrupted event is encountered
331 uint32_t corruptedEventSleepTime_;
332 static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
333
334 // sleep duration in seconds when an IO error is encountered in the writer thread
335 uint32_t writerThreadIOErrorSleepTime_;
336 static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
337
338 // writer thread
339 apache::thrift::concurrency::ThreadFactory threadFactory_;
340 std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
341
342 // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
343 // needs to be written to the file. The buffers are swapped by the writer thread.
344 TFileTransportBuffer* dequeueBuffer_;
345 TFileTransportBuffer* enqueueBuffer_;
346
347 // conditions used to block when the buffer is full or empty
348 Monitor notFull_, notEmpty_;
349 std::atomic<bool> closing_;
350
351 // To keep track of whether the buffer has been flushed
352 Monitor flushed_;
353 std::atomic<bool> forceFlush_;
354
355 // Mutex that is grabbed when enqueueing and swapping the read/write buffers
356 Mutex mutex_;
357
358 // File information
359 std::string filename_;
360 int fd_;
361
362 // Whether the writer thread and buffers have been initialized
363 bool bufferAndThreadInitialized_;
364
365 // Offset within the file
366 off_t offset_;
367
368 // event corruption information
369 uint32_t lastBadChunk_;
370 uint32_t numCorruptedEventsInChunk_;
371
372 bool readOnly_;
373};
374
375// Exception thrown when EOF is hit
376class TEOFException : public TTransportException {
377public:
378 TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
379};
380
381// wrapper class to process events from a file containing thrift events
382class TFileProcessor {
383public:
384 /**
385 * Constructor that defaults output transport to null transport
386 *
387 * @param processor processes log-file events
388 * @param protocolFactory protocol factory
389 * @param inputTransport file transport
390 */
391 TFileProcessor(std::shared_ptr<TProcessor> processor,
392 std::shared_ptr<TProtocolFactory> protocolFactory,
393 std::shared_ptr<TFileReaderTransport> inputTransport);
394
395 TFileProcessor(std::shared_ptr<TProcessor> processor,
396 std::shared_ptr<TProtocolFactory> inputProtocolFactory,
397 std::shared_ptr<TProtocolFactory> outputProtocolFactory,
398 std::shared_ptr<TFileReaderTransport> inputTransport);
399
400 /**
401 * Constructor
402 *
403 * @param processor processes log-file events
404 * @param protocolFactory protocol factory
405 * @param inputTransport input file transport
406 * @param output output transport
407 */
408 TFileProcessor(std::shared_ptr<TProcessor> processor,
409 std::shared_ptr<TProtocolFactory> protocolFactory,
410 std::shared_ptr<TFileReaderTransport> inputTransport,
411 std::shared_ptr<TTransport> outputTransport);
412
413 /**
414 * processes events from the file
415 *
416 * @param numEvents number of events to process (0 for unlimited)
417 * @param tail tails the file if true
418 */
419 void process(uint32_t numEvents, bool tail);
420
421 /**
422 * process events until the end of the chunk
423 *
424 */
425 void processChunk();
426
427private:
428 std::shared_ptr<TProcessor> processor_;
429 std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
430 std::shared_ptr<TProtocolFactory> outputProtocolFactory_;
431 std::shared_ptr<TFileReaderTransport> inputTransport_;
432 std::shared_ptr<TTransport> outputTransport_;
433};
434}
435}
436} // apache::thrift::transport
437
438#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
439