]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TBufferTransports.cpp
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
23 #include <thrift/transport/TBufferTransports.h>
31 uint32_t TBufferedTransport::readSlow(uint8_t* buf
, uint32_t len
) {
32 auto have
= static_cast<uint32_t>(rBound_
- rBase_
);
34 // We should only take the slow path if we can't satisfy the read
35 // with the data already in the buffer.
38 // If we have some data in the buffer, copy it out and return it.
39 // We have to return it without attempting to read more, since we aren't
40 // guaranteed that the underlying transport actually has more data, so
41 // attempting to read from it could block.
43 memcpy(buf
, rBase_
, have
);
44 setReadBuffer(rBuf_
.get(), 0);
48 // No data is available in our buffer.
49 // Get more from underlying transport up to buffer size.
50 // Note that this makes a lot of sense if len < rBufSize_
51 // and almost no sense otherwise. TODO(dreiss): Fix that
52 // case (possibly including some readv hotness).
53 setReadBuffer(rBuf_
.get(), transport_
->read(rBuf_
.get(), rBufSize_
));
55 // Hand over whatever we have.
56 uint32_t give
= (std::min
)(len
, static_cast<uint32_t>(rBound_
- rBase_
));
57 memcpy(buf
, rBase_
, give
);
63 void TBufferedTransport::writeSlow(const uint8_t* buf
, uint32_t len
) {
64 auto have_bytes
= static_cast<uint32_t>(wBase_
- wBuf_
.get());
65 auto space
= static_cast<uint32_t>(wBound_
- wBase_
);
66 // We should only take the slow path if we can't accommodate the write
67 // with the free space already in the buffer.
68 assert(wBound_
- wBase_
< static_cast<ptrdiff_t>(len
));
70 // Now here's the tricky question: should we copy data from buf into our
71 // internal buffer and write it from there, or should we just write out
72 // the current internal buffer in one syscall and write out buf in another.
73 // If our currently buffered data plus buf is at least double our buffer
74 // size, we will have to do two syscalls no matter what (except in the
75 // degenerate case when our buffer is empty), so there is no use copying.
76 // Otherwise, there is sort of a sliding scale. If we have N-1 bytes
77 // buffered and need to write 2, it would be crazy to do two syscalls.
78 // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
79 // we can save a syscall in the short term by loading up our buffer, writing
80 // it out, and copying the rest of the bytes into our buffer. Of course,
81 // if we get another 2-byte write, we haven't saved any syscalls at all,
82 // and have just copied nearly 2N bytes for nothing. Finding a perfect
83 // policy would require predicting the size of future writes, so we're just
84 // going to always eschew syscalls if we have less than 2N bytes to write.
86 // The case where we have to do two syscalls.
87 // This case also covers the case where the buffer is empty,
88 // but it is clearer (I think) to think of it as two separate cases.
89 if ((have_bytes
+ len
>= 2 * wBufSize_
) || (have_bytes
== 0)) {
90 // TODO(dreiss): writev
92 transport_
->write(wBuf_
.get(), have_bytes
);
94 transport_
->write(buf
, len
);
99 // Fill up our internal buffer for a write.
100 memcpy(wBase_
, buf
, space
);
103 transport_
->write(wBuf_
.get(), wBufSize_
);
105 // Copy the rest into our buffer.
106 assert(len
< wBufSize_
);
107 memcpy(wBuf_
.get(), buf
, len
);
108 wBase_
= wBuf_
.get() + len
;
112 const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf
, uint32_t* len
) {
115 // Simply return NULL. We don't know if there is actually data available on
116 // the underlying transport, so calling read() might block.
120 void TBufferedTransport::flush() {
121 // Write out any data waiting in the write buffer.
122 auto have_bytes
= static_cast<uint32_t>(wBase_
- wBuf_
.get());
123 if (have_bytes
> 0) {
124 // Note that we reset wBase_ prior to the underlying write
125 // to ensure we're in a sane state (i.e. internal buffer cleaned)
126 // if the underlying write throws up an exception
127 wBase_
= wBuf_
.get();
128 transport_
->write(wBuf_
.get(), have_bytes
);
131 // Flush the underlying transport.
135 uint32_t TFramedTransport::readSlow(uint8_t* buf
, uint32_t len
) {
137 auto have
= static_cast<uint32_t>(rBound_
- rBase_
);
139 // We should only take the slow path if we can't satisfy the read
140 // with the data already in the buffer.
143 // If we have some data in the buffer, copy it out and return it.
144 // We have to return it without attempting to read more, since we aren't
145 // guaranteed that the underlying transport actually has more data, so
146 // attempting to read from it could block.
148 memcpy(buf
, rBase_
, have
);
149 setReadBuffer(rBuf_
.get(), 0);
153 // Read another frame.
155 // EOF. No frame available.
159 // TODO(dreiss): Should we warn when reads cross frames?
161 // Hand over whatever we have.
162 uint32_t give
= (std::min
)(want
, static_cast<uint32_t>(rBound_
- rBase_
));
163 memcpy(buf
, rBase_
, give
);
170 bool TFramedTransport::readFrame() {
171 // TODO(dreiss): Think about using readv here, even though it would
172 // result in (gasp) read-ahead.
174 // Read the size of the next frame.
175 // We can't use readAll(&sz, sizeof(sz)), since that always throws an
176 // exception on EOF. We want to throw an exception only if EOF occurs after
177 // partial size data.
179 uint32_t size_bytes_read
= 0;
180 while (size_bytes_read
< sizeof(sz
)) {
181 uint8_t* szp
= reinterpret_cast<uint8_t*>(&sz
) + size_bytes_read
;
183 = transport_
->read(szp
, static_cast<uint32_t>(sizeof(sz
)) - size_bytes_read
);
184 if (bytes_read
== 0) {
185 if (size_bytes_read
== 0) {
186 // EOF before any data was read.
189 // EOF after a partial frame header. Raise an exception.
190 throw TTransportException(TTransportException::END_OF_FILE
,
191 "No more data to read after "
192 "partial frame header.");
195 size_bytes_read
+= bytes_read
;
201 throw TTransportException("Frame size has negative value");
204 // Check for oversized frame
205 if (sz
> static_cast<int32_t>(maxFrameSize_
))
206 throw TTransportException(TTransportException::CORRUPTED_DATA
, "Received an oversized frame");
208 // Read the frame payload, and reset markers.
209 if (sz
> static_cast<int32_t>(rBufSize_
)) {
210 rBuf_
.reset(new uint8_t[sz
]);
213 transport_
->readAll(rBuf_
.get(), sz
);
214 setReadBuffer(rBuf_
.get(), sz
);
218 void TFramedTransport::writeSlow(const uint8_t* buf
, uint32_t len
) {
219 // Double buffer size until sufficient.
220 auto have
= static_cast<uint32_t>(wBase_
- wBuf_
.get());
221 uint32_t new_size
= wBufSize_
;
222 if (len
+ have
< have
/* overflow */ || len
+ have
> 0x7fffffff) {
223 throw TTransportException(TTransportException::BAD_ARGS
,
224 "Attempted to write over 2 GB to TFramedTransport.");
226 while (new_size
< len
+ have
) {
227 new_size
= new_size
> 0 ? new_size
* 2 : 1;
230 // TODO(dreiss): Consider modifying this class to use malloc/free
231 // so we can use realloc here.
233 // Allocate new buffer.
234 auto* new_buf
= new uint8_t[new_size
];
236 // Copy the old buffer to the new one.
237 memcpy(new_buf
, wBuf_
.get(), have
);
239 // Now point buf to the new one.
240 wBuf_
.reset(new_buf
);
241 wBufSize_
= new_size
;
242 wBase_
= wBuf_
.get() + have
;
243 wBound_
= wBuf_
.get() + wBufSize_
;
245 // Copy the data into the new buffer.
246 memcpy(wBase_
, buf
, len
);
250 void TFramedTransport::flush() {
251 int32_t sz_hbo
, sz_nbo
;
252 assert(wBufSize_
> sizeof(sz_nbo
));
254 // Slip the frame size into the start of the buffer.
255 sz_hbo
= static_cast<uint32_t>(wBase_
- (wBuf_
.get() + sizeof(sz_nbo
)));
256 sz_nbo
= (int32_t)htonl((uint32_t)(sz_hbo
));
257 memcpy(wBuf_
.get(), (uint8_t*)&sz_nbo
, sizeof(sz_nbo
));
260 // Note that we reset wBase_ (with a pad for the frame size)
261 // prior to the underlying write to ensure we're in a sane state
262 // (i.e. internal buffer cleaned) if the underlying write throws
264 wBase_
= wBuf_
.get() + sizeof(sz_nbo
);
266 // Write size and frame body.
267 transport_
->write(wBuf_
.get(), static_cast<uint32_t>(sizeof(sz_nbo
)) + sz_hbo
);
270 // Flush the underlying transport.
273 // reclaim write buffer
274 if (wBufSize_
> bufReclaimThresh_
) {
275 wBufSize_
= DEFAULT_BUFFER_SIZE
;
276 wBuf_
.reset(new uint8_t[wBufSize_
]);
277 setWriteBuffer(wBuf_
.get(), wBufSize_
);
279 // reset wBase_ with a pad for the frame size
281 wBase_
= wBuf_
.get() + sizeof(pad
);
285 uint32_t TFramedTransport::writeEnd() {
286 return static_cast<uint32_t>(wBase_
- wBuf_
.get());
289 const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf
, uint32_t* len
) {
292 // Don't try to be clever with shifting buffers.
293 // If the fast path failed let the protocol use its slow path.
294 // Besides, who is going to try to borrow across messages?
298 uint32_t TFramedTransport::readEnd() {
299 // include framing bytes
300 auto bytes_read
= static_cast<uint32_t>(rBound_
- rBuf_
.get() + sizeof(uint32_t));
302 if (rBufSize_
> bufReclaimThresh_
) {
305 setReadBuffer(rBuf_
.get(), rBufSize_
);
311 void TMemoryBuffer::computeRead(uint32_t len
, uint8_t** out_start
, uint32_t* out_give
) {
312 // Correct rBound_ so we can use the fast path in the future.
315 // Decide how much to give.
316 uint32_t give
= (std::min
)(len
, available_read());
321 // Preincrement rBase_ so the caller doesn't have to.
325 uint32_t TMemoryBuffer::readSlow(uint8_t* buf
, uint32_t len
) {
328 computeRead(len
, &start
, &give
);
330 // Copy into the provided buffer.
331 memcpy(buf
, start
, give
);
336 uint32_t TMemoryBuffer::readAppendToString(std::string
& str
, uint32_t len
) {
337 // Don't get some stupid assertion failure.
338 if (buffer_
== nullptr) {
344 computeRead(len
, &start
, &give
);
346 // Append to the provided string.
347 str
.append((char*)start
, give
);
352 void TMemoryBuffer::ensureCanWrite(uint32_t len
) {
353 // Check available space
354 uint32_t avail
= available_write();
360 throw TTransportException("Insufficient space in external MemoryBuffer");
363 // Grow the buffer as necessary.
364 uint64_t new_size
= bufferSize_
;
365 while (len
> avail
) {
366 new_size
= new_size
> 0 ? new_size
* 2 : 1;
367 if (new_size
> maxBufferSize_
) {
368 throw TTransportException(TTransportException::BAD_ARGS
,
369 "Internal buffer size overflow");
371 avail
= available_write() + (static_cast<uint32_t>(new_size
) - bufferSize_
);
374 // Allocate into a new pointer so we don't bork ours if it fails.
375 auto* new_buffer
= static_cast<uint8_t*>(std::realloc(buffer_
, new_size
));
376 if (new_buffer
== nullptr) {
377 throw std::bad_alloc();
380 rBase_
= new_buffer
+ (rBase_
- buffer_
);
381 rBound_
= new_buffer
+ (rBound_
- buffer_
);
382 wBase_
= new_buffer
+ (wBase_
- buffer_
);
383 wBound_
= new_buffer
+ new_size
;
384 buffer_
= new_buffer
;
385 bufferSize_
= static_cast<uint32_t>(new_size
);
388 void TMemoryBuffer::writeSlow(const uint8_t* buf
, uint32_t len
) {
391 // Copy into the buffer and increment wBase_.
392 memcpy(wBase_
, buf
, len
);
396 void TMemoryBuffer::wroteBytes(uint32_t len
) {
397 uint32_t avail
= available_write();
399 throw TTransportException("Client wrote more bytes than size of buffer.");
404 const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf
, uint32_t* len
) {
407 if (available_read() >= *len
) {
408 *len
= available_read();
415 } // apache::thrift::transport