2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
21 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
27 #include <thrift/transport/TTransport.h>
28 // Include the buffered transports that used to be defined here.
29 #include <thrift/transport/TBufferTransports.h>
30 #include <thrift/transport/TFileTransport.h>
37 * The null transport is a dummy transport that doesn't actually do anything.
38 * It's sort of an analogy to /dev/null, you can never read anything from it
39 * and it will let you write anything you want to it, though it won't actually
43 class TNullTransport
: public TVirtualTransport
<TNullTransport
> {
45 TNullTransport() = default;
47 ~TNullTransport() override
= default;
49 bool isOpen() const override
{ return true; }
51 void open() override
{}
53 void write(const uint8_t* /* buf */, uint32_t /* len */) { return; }
57 * TPipedTransport. This transport allows piping of a request from one
58 * transport to another either when readEnd() or writeEnd(). The typical
59 * use case for this is to log a request or a reply to disk.
60 * The underlying buffer expands to a keep a copy of the entire
64 class TPipedTransport
: virtual public TTransport
{
66 TPipedTransport(std::shared_ptr
<TTransport
> srcTrans
, std::shared_ptr
<TTransport
> dstTrans
)
67 : srcTrans_(srcTrans
),
75 // default is to to pipe the request when readEnd() is called
79 rBuf_
= (uint8_t*)std::malloc(sizeof(uint8_t) * rBufSize_
);
80 if (rBuf_
== nullptr) {
81 throw std::bad_alloc();
83 wBuf_
= (uint8_t*)std::malloc(sizeof(uint8_t) * wBufSize_
);
84 if (wBuf_
== nullptr) {
85 throw std::bad_alloc();
89 TPipedTransport(std::shared_ptr
<TTransport
> srcTrans
,
90 std::shared_ptr
<TTransport
> dstTrans
,
92 : srcTrans_(srcTrans
),
100 rBuf_
= (uint8_t*)std::malloc(sizeof(uint8_t) * rBufSize_
);
101 if (rBuf_
== nullptr) {
102 throw std::bad_alloc();
104 wBuf_
= (uint8_t*)std::malloc(sizeof(uint8_t) * wBufSize_
);
105 if (wBuf_
== nullptr) {
106 throw std::bad_alloc();
110 ~TPipedTransport() override
{
115 bool isOpen() const override
{ return srcTrans_
->isOpen(); }
117 bool peek() override
{
118 if (rPos_
>= rLen_
) {
119 // Double the size of the underlying buffer if it is full
120 if (rLen_
== rBufSize_
) {
122 auto * tmpBuf
= (uint8_t*)std::realloc(rBuf_
, sizeof(uint8_t) * rBufSize_
);
123 if (tmpBuf
== nullptr) {
124 throw std::bad_alloc();
129 // try to fill up the buffer
130 rLen_
+= srcTrans_
->read(rBuf_
+ rPos_
, rBufSize_
- rPos_
);
132 return (rLen_
> rPos_
);
135 void open() override
{ srcTrans_
->open(); }
137 void close() override
{ srcTrans_
->close(); }
139 void setPipeOnRead(bool pipeVal
) { pipeOnRead_
= pipeVal
; }
141 void setPipeOnWrite(bool pipeVal
) { pipeOnWrite_
= pipeVal
; }
143 uint32_t read(uint8_t* buf
, uint32_t len
);
145 uint32_t readEnd() override
{
148 dstTrans_
->write(rBuf_
, rPos_
);
152 srcTrans_
->readEnd();
154 // If requests are being pipelined, copy down our read-ahead data,
155 // then reset our state.
156 int read_ahead
= rLen_
- rPos_
;
157 uint32_t bytes
= rPos_
;
158 memcpy(rBuf_
, rBuf_
+ rPos_
, read_ahead
);
165 void write(const uint8_t* buf
, uint32_t len
);
167 uint32_t writeEnd() override
{
169 dstTrans_
->write(wBuf_
, wLen_
);
175 void flush() override
;
177 std::shared_ptr
<TTransport
> getTargetTransport() { return dstTrans_
; }
180 * Override TTransport *_virt() functions to invoke our implementations.
181 * We cannot use TVirtualTransport to provide these, since we need to inherit
182 * virtually from TTransport.
184 uint32_t read_virt(uint8_t* buf
, uint32_t len
) override
{ return this->read(buf
, len
); }
185 void write_virt(const uint8_t* buf
, uint32_t len
) override
{ this->write(buf
, len
); }
188 std::shared_ptr
<TTransport
> srcTrans_
;
189 std::shared_ptr
<TTransport
> dstTrans_
;
205 * Wraps a transport into a pipedTransport instance.
208 class TPipedTransportFactory
: public TTransportFactory
{
210 TPipedTransportFactory() = default;
211 TPipedTransportFactory(std::shared_ptr
<TTransport
> dstTrans
) {
212 initializeTargetTransport(dstTrans
);
214 ~TPipedTransportFactory() override
= default;
217 * Wraps the base transport into a piped transport.
219 std::shared_ptr
<TTransport
> getTransport(std::shared_ptr
<TTransport
> srcTrans
) override
{
220 return std::shared_ptr
<TTransport
>(new TPipedTransport(srcTrans
, dstTrans_
));
223 virtual void initializeTargetTransport(std::shared_ptr
<TTransport
> dstTrans
) {
224 if (dstTrans_
.get() == nullptr) {
225 dstTrans_
= dstTrans
;
227 throw TException("Target transport already initialized");
232 std::shared_ptr
<TTransport
> dstTrans_
;
236 * TPipedFileTransport. This is just like a TTransport, except that
237 * it is a templatized class, so that clients who rely on a specific
238 * TTransport can still access the original transport.
241 class TPipedFileReaderTransport
: public TPipedTransport
, public TFileReaderTransport
{
243 TPipedFileReaderTransport(std::shared_ptr
<TFileReaderTransport
> srcTrans
,
244 std::shared_ptr
<TTransport
> dstTrans
);
246 ~TPipedFileReaderTransport() override
;
248 // TTransport functions
249 bool isOpen() const override
;
250 bool peek() override
;
251 void open() override
;
252 void close() override
;
253 uint32_t read(uint8_t* buf
, uint32_t len
);
254 uint32_t readAll(uint8_t* buf
, uint32_t len
);
255 uint32_t readEnd() override
;
256 void write(const uint8_t* buf
, uint32_t len
);
257 uint32_t writeEnd() override
;
258 void flush() override
;
260 // TFileReaderTransport functions
261 int32_t getReadTimeout() override
;
262 void setReadTimeout(int32_t readTimeout
) override
;
263 uint32_t getNumChunks() override
;
264 uint32_t getCurChunk() override
;
265 void seekToChunk(int32_t chunk
) override
;
266 void seekToEnd() override
;
269 * Override TTransport *_virt() functions to invoke our implementations.
270 * We cannot use TVirtualTransport to provide these, since we need to inherit
271 * virtually from TTransport.
273 uint32_t read_virt(uint8_t* buf
, uint32_t len
) override
{ return this->read(buf
, len
); }
274 uint32_t readAll_virt(uint8_t* buf
, uint32_t len
) override
{ return this->readAll(buf
, len
); }
275 void write_virt(const uint8_t* buf
, uint32_t len
) override
{ this->write(buf
, len
); }
279 TPipedFileReaderTransport();
280 std::shared_ptr
<TFileReaderTransport
> srcTrans_
;
284 * Creates a TPipedFileReaderTransport from a filepath and a destination transport
287 class TPipedFileReaderTransportFactory
: public TPipedTransportFactory
{
289 TPipedFileReaderTransportFactory() = default;
290 TPipedFileReaderTransportFactory(std::shared_ptr
<TTransport
> dstTrans
)
291 : TPipedTransportFactory(dstTrans
) {}
292 ~TPipedFileReaderTransportFactory() override
= default;
294 std::shared_ptr
<TTransport
> getTransport(std::shared_ptr
<TTransport
> srcTrans
) override
{
295 std::shared_ptr
<TFileReaderTransport
> pFileReaderTransport
296 = std::dynamic_pointer_cast
<TFileReaderTransport
>(srcTrans
);
297 if (pFileReaderTransport
.get() != nullptr) {
298 return getFileReaderTransport(pFileReaderTransport
);
300 return std::shared_ptr
<TTransport
>();
304 std::shared_ptr
<TFileReaderTransport
> getFileReaderTransport(
305 std::shared_ptr
<TFileReaderTransport
> srcTrans
) {
306 return std::shared_ptr
<TFileReaderTransport
>(
307 new TPipedFileReaderTransport(srcTrans
, dstTrans_
));
312 } // apache::thrift::transport
314 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_