]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TBufferTransports.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TBufferTransports.cpp
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <cassert>
21 #include <algorithm>
22
23 #include <thrift/transport/TBufferTransports.h>
24
25 using std::string;
26
27 namespace apache {
28 namespace thrift {
29 namespace transport {
30
31 uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
32 auto have = static_cast<uint32_t>(rBound_ - rBase_);
33
34 // We should only take the slow path if we can't satisfy the read
35 // with the data already in the buffer.
36 assert(have < len);
37
38 // If we have some data in the buffer, copy it out and return it.
39 // We have to return it without attempting to read more, since we aren't
40 // guaranteed that the underlying transport actually has more data, so
41 // attempting to read from it could block.
42 if (have > 0) {
43 memcpy(buf, rBase_, have);
44 setReadBuffer(rBuf_.get(), 0);
45 return have;
46 }
47
48 // No data is available in our buffer.
49 // Get more from underlying transport up to buffer size.
50 // Note that this makes a lot of sense if len < rBufSize_
51 // and almost no sense otherwise. TODO(dreiss): Fix that
52 // case (possibly including some readv hotness).
53 setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
54
55 // Hand over whatever we have.
56 uint32_t give = (std::min)(len, static_cast<uint32_t>(rBound_ - rBase_));
57 memcpy(buf, rBase_, give);
58 rBase_ += give;
59
60 return give;
61 }
62
63 void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
64 auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
65 auto space = static_cast<uint32_t>(wBound_ - wBase_);
66 // We should only take the slow path if we can't accommodate the write
67 // with the free space already in the buffer.
68 assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
69
70 // Now here's the tricky question: should we copy data from buf into our
71 // internal buffer and write it from there, or should we just write out
72 // the current internal buffer in one syscall and write out buf in another.
73 // If our currently buffered data plus buf is at least double our buffer
74 // size, we will have to do two syscalls no matter what (except in the
75 // degenerate case when our buffer is empty), so there is no use copying.
76 // Otherwise, there is sort of a sliding scale. If we have N-1 bytes
77 // buffered and need to write 2, it would be crazy to do two syscalls.
78 // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
79 // we can save a syscall in the short term by loading up our buffer, writing
80 // it out, and copying the rest of the bytes into our buffer. Of course,
81 // if we get another 2-byte write, we haven't saved any syscalls at all,
82 // and have just copied nearly 2N bytes for nothing. Finding a perfect
83 // policy would require predicting the size of future writes, so we're just
84 // going to always eschew syscalls if we have less than 2N bytes to write.
85
86 // The case where we have to do two syscalls.
87 // This case also covers the case where the buffer is empty,
88 // but it is clearer (I think) to think of it as two separate cases.
89 if ((have_bytes + len >= 2 * wBufSize_) || (have_bytes == 0)) {
90 // TODO(dreiss): writev
91 if (have_bytes > 0) {
92 transport_->write(wBuf_.get(), have_bytes);
93 }
94 transport_->write(buf, len);
95 wBase_ = wBuf_.get();
96 return;
97 }
98
99 // Fill up our internal buffer for a write.
100 memcpy(wBase_, buf, space);
101 buf += space;
102 len -= space;
103 transport_->write(wBuf_.get(), wBufSize_);
104
105 // Copy the rest into our buffer.
106 assert(len < wBufSize_);
107 memcpy(wBuf_.get(), buf, len);
108 wBase_ = wBuf_.get() + len;
109 return;
110 }
111
112 const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
113 (void)buf;
114 (void)len;
115 // Simply return NULL. We don't know if there is actually data available on
116 // the underlying transport, so calling read() might block.
117 return nullptr;
118 }
119
120 void TBufferedTransport::flush() {
121 // Write out any data waiting in the write buffer.
122 auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
123 if (have_bytes > 0) {
124 // Note that we reset wBase_ prior to the underlying write
125 // to ensure we're in a sane state (i.e. internal buffer cleaned)
126 // if the underlying write throws up an exception
127 wBase_ = wBuf_.get();
128 transport_->write(wBuf_.get(), have_bytes);
129 }
130
131 // Flush the underlying transport.
132 transport_->flush();
133 }
134
135 uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
136 uint32_t want = len;
137 auto have = static_cast<uint32_t>(rBound_ - rBase_);
138
139 // We should only take the slow path if we can't satisfy the read
140 // with the data already in the buffer.
141 assert(have < want);
142
143 // If we have some data in the buffer, copy it out and return it.
144 // We have to return it without attempting to read more, since we aren't
145 // guaranteed that the underlying transport actually has more data, so
146 // attempting to read from it could block.
147 if (have > 0) {
148 memcpy(buf, rBase_, have);
149 setReadBuffer(rBuf_.get(), 0);
150 return have;
151 }
152
153 // Read another frame.
154 if (!readFrame()) {
155 // EOF. No frame available.
156 return 0;
157 }
158
159 // TODO(dreiss): Should we warn when reads cross frames?
160
161 // Hand over whatever we have.
162 uint32_t give = (std::min)(want, static_cast<uint32_t>(rBound_ - rBase_));
163 memcpy(buf, rBase_, give);
164 rBase_ += give;
165 want -= give;
166
167 return (len - want);
168 }
169
170 bool TFramedTransport::readFrame() {
171 // TODO(dreiss): Think about using readv here, even though it would
172 // result in (gasp) read-ahead.
173
174 // Read the size of the next frame.
175 // We can't use readAll(&sz, sizeof(sz)), since that always throws an
176 // exception on EOF. We want to throw an exception only if EOF occurs after
177 // partial size data.
178 int32_t sz = -1;
179 uint32_t size_bytes_read = 0;
180 while (size_bytes_read < sizeof(sz)) {
181 uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;
182 uint32_t bytes_read
183 = transport_->read(szp, static_cast<uint32_t>(sizeof(sz)) - size_bytes_read);
184 if (bytes_read == 0) {
185 if (size_bytes_read == 0) {
186 // EOF before any data was read.
187 return false;
188 } else {
189 // EOF after a partial frame header. Raise an exception.
190 throw TTransportException(TTransportException::END_OF_FILE,
191 "No more data to read after "
192 "partial frame header.");
193 }
194 }
195 size_bytes_read += bytes_read;
196 }
197
198 sz = ntohl(sz);
199
200 if (sz < 0) {
201 throw TTransportException("Frame size has negative value");
202 }
203
204 // Check for oversized frame
205 if (sz > static_cast<int32_t>(maxFrameSize_))
206 throw TTransportException(TTransportException::CORRUPTED_DATA, "Received an oversized frame");
207
208 // Read the frame payload, and reset markers.
209 if (sz > static_cast<int32_t>(rBufSize_)) {
210 rBuf_.reset(new uint8_t[sz]);
211 rBufSize_ = sz;
212 }
213 transport_->readAll(rBuf_.get(), sz);
214 setReadBuffer(rBuf_.get(), sz);
215 return true;
216 }
217
218 void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
219 // Double buffer size until sufficient.
220 auto have = static_cast<uint32_t>(wBase_ - wBuf_.get());
221 uint32_t new_size = wBufSize_;
222 if (len + have < have /* overflow */ || len + have > 0x7fffffff) {
223 throw TTransportException(TTransportException::BAD_ARGS,
224 "Attempted to write over 2 GB to TFramedTransport.");
225 }
226 while (new_size < len + have) {
227 new_size = new_size > 0 ? new_size * 2 : 1;
228 }
229
230 // TODO(dreiss): Consider modifying this class to use malloc/free
231 // so we can use realloc here.
232
233 // Allocate new buffer.
234 auto* new_buf = new uint8_t[new_size];
235
236 // Copy the old buffer to the new one.
237 memcpy(new_buf, wBuf_.get(), have);
238
239 // Now point buf to the new one.
240 wBuf_.reset(new_buf);
241 wBufSize_ = new_size;
242 wBase_ = wBuf_.get() + have;
243 wBound_ = wBuf_.get() + wBufSize_;
244
245 // Copy the data into the new buffer.
246 memcpy(wBase_, buf, len);
247 wBase_ += len;
248 }
249
250 void TFramedTransport::flush() {
251 int32_t sz_hbo, sz_nbo;
252 assert(wBufSize_ > sizeof(sz_nbo));
253
254 // Slip the frame size into the start of the buffer.
255 sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
256 sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
257 memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
258
259 if (sz_hbo > 0) {
260 // Note that we reset wBase_ (with a pad for the frame size)
261 // prior to the underlying write to ensure we're in a sane state
262 // (i.e. internal buffer cleaned) if the underlying write throws
263 // up an exception
264 wBase_ = wBuf_.get() + sizeof(sz_nbo);
265
266 // Write size and frame body.
267 transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
268 }
269
270 // Flush the underlying transport.
271 transport_->flush();
272
273 // reclaim write buffer
274 if (wBufSize_ > bufReclaimThresh_) {
275 wBufSize_ = DEFAULT_BUFFER_SIZE;
276 wBuf_.reset(new uint8_t[wBufSize_]);
277 setWriteBuffer(wBuf_.get(), wBufSize_);
278
279 // reset wBase_ with a pad for the frame size
280 int32_t pad = 0;
281 wBase_ = wBuf_.get() + sizeof(pad);
282 }
283 }
284
285 uint32_t TFramedTransport::writeEnd() {
286 return static_cast<uint32_t>(wBase_ - wBuf_.get());
287 }
288
289 const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
290 (void)buf;
291 (void)len;
292 // Don't try to be clever with shifting buffers.
293 // If the fast path failed let the protocol use its slow path.
294 // Besides, who is going to try to borrow across messages?
295 return nullptr;
296 }
297
298 uint32_t TFramedTransport::readEnd() {
299 // include framing bytes
300 auto bytes_read = static_cast<uint32_t>(rBound_ - rBuf_.get() + sizeof(uint32_t));
301
302 if (rBufSize_ > bufReclaimThresh_) {
303 rBufSize_ = 0;
304 rBuf_.reset();
305 setReadBuffer(rBuf_.get(), rBufSize_);
306 }
307
308 return bytes_read;
309 }
310
311 void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
312 // Correct rBound_ so we can use the fast path in the future.
313 rBound_ = wBase_;
314
315 // Decide how much to give.
316 uint32_t give = (std::min)(len, available_read());
317
318 *out_start = rBase_;
319 *out_give = give;
320
321 // Preincrement rBase_ so the caller doesn't have to.
322 rBase_ += give;
323 }
324
325 uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) {
326 uint8_t* start;
327 uint32_t give;
328 computeRead(len, &start, &give);
329
330 // Copy into the provided buffer.
331 memcpy(buf, start, give);
332
333 return give;
334 }
335
336 uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
337 // Don't get some stupid assertion failure.
338 if (buffer_ == nullptr) {
339 return 0;
340 }
341
342 uint8_t* start;
343 uint32_t give;
344 computeRead(len, &start, &give);
345
346 // Append to the provided string.
347 str.append((char*)start, give);
348
349 return give;
350 }
351
352 void TMemoryBuffer::ensureCanWrite(uint32_t len) {
353 // Check available space
354 uint32_t avail = available_write();
355 if (len <= avail) {
356 return;
357 }
358
359 if (!owner_) {
360 throw TTransportException("Insufficient space in external MemoryBuffer");
361 }
362
363 // Grow the buffer as necessary.
364 uint64_t new_size = bufferSize_;
365 while (len > avail) {
366 new_size = new_size > 0 ? new_size * 2 : 1;
367 if (new_size > maxBufferSize_) {
368 throw TTransportException(TTransportException::BAD_ARGS,
369 "Internal buffer size overflow");
370 }
371 avail = available_write() + (static_cast<uint32_t>(new_size) - bufferSize_);
372 }
373
374 // Allocate into a new pointer so we don't bork ours if it fails.
375 auto* new_buffer = static_cast<uint8_t*>(std::realloc(buffer_, new_size));
376 if (new_buffer == nullptr) {
377 throw std::bad_alloc();
378 }
379
380 rBase_ = new_buffer + (rBase_ - buffer_);
381 rBound_ = new_buffer + (rBound_ - buffer_);
382 wBase_ = new_buffer + (wBase_ - buffer_);
383 wBound_ = new_buffer + new_size;
384 buffer_ = new_buffer;
385 bufferSize_ = static_cast<uint32_t>(new_size);
386 }
387
388 void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) {
389 ensureCanWrite(len);
390
391 // Copy into the buffer and increment wBase_.
392 memcpy(wBase_, buf, len);
393 wBase_ += len;
394 }
395
396 void TMemoryBuffer::wroteBytes(uint32_t len) {
397 uint32_t avail = available_write();
398 if (len > avail) {
399 throw TTransportException("Client wrote more bytes than size of buffer.");
400 }
401 wBase_ += len;
402 }
403
404 const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
405 (void)buf;
406 rBound_ = wBase_;
407 if (available_read() >= *len) {
408 *len = available_read();
409 return rBase_;
410 }
411 return nullptr;
412 }
413 }
414 }
415 } // apache::thrift::transport