]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <cassert> | |
21 | #include <algorithm> | |
22 | ||
23 | #include <thrift/transport/TBufferTransports.h> | |
24 | ||
25 | using std::string; | |
26 | ||
27 | namespace apache { | |
28 | namespace thrift { | |
29 | namespace transport { | |
30 | ||
31 | uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) { | |
32 | auto have = static_cast<uint32_t>(rBound_ - rBase_); | |
33 | ||
34 | // We should only take the slow path if we can't satisfy the read | |
35 | // with the data already in the buffer. | |
36 | assert(have < len); | |
37 | ||
38 | // If we have some data in the buffer, copy it out and return it. | |
39 | // We have to return it without attempting to read more, since we aren't | |
40 | // guaranteed that the underlying transport actually has more data, so | |
41 | // attempting to read from it could block. | |
42 | if (have > 0) { | |
43 | memcpy(buf, rBase_, have); | |
44 | setReadBuffer(rBuf_.get(), 0); | |
45 | return have; | |
46 | } | |
47 | ||
48 | // No data is available in our buffer. | |
49 | // Get more from underlying transport up to buffer size. | |
50 | // Note that this makes a lot of sense if len < rBufSize_ | |
51 | // and almost no sense otherwise. TODO(dreiss): Fix that | |
52 | // case (possibly including some readv hotness). | |
53 | setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); | |
54 | ||
55 | // Hand over whatever we have. | |
56 | uint32_t give = (std::min)(len, static_cast<uint32_t>(rBound_ - rBase_)); | |
57 | memcpy(buf, rBase_, give); | |
58 | rBase_ += give; | |
59 | ||
60 | return give; | |
61 | } | |
62 | ||
63 | void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) { | |
64 | auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get()); | |
65 | auto space = static_cast<uint32_t>(wBound_ - wBase_); | |
66 | // We should only take the slow path if we can't accommodate the write | |
67 | // with the free space already in the buffer. | |
68 | assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len)); | |
69 | ||
70 | // Now here's the tricky question: should we copy data from buf into our | |
71 | // internal buffer and write it from there, or should we just write out | |
72 | // the current internal buffer in one syscall and write out buf in another. | |
73 | // If our currently buffered data plus buf is at least double our buffer | |
74 | // size, we will have to do two syscalls no matter what (except in the | |
75 | // degenerate case when our buffer is empty), so there is no use copying. | |
76 | // Otherwise, there is sort of a sliding scale. If we have N-1 bytes | |
77 | // buffered and need to write 2, it would be crazy to do two syscalls. | |
78 | // On the other hand, if we have 2 bytes buffered and are writing 2N-3, | |
79 | // we can save a syscall in the short term by loading up our buffer, writing | |
80 | // it out, and copying the rest of the bytes into our buffer. Of course, | |
81 | // if we get another 2-byte write, we haven't saved any syscalls at all, | |
82 | // and have just copied nearly 2N bytes for nothing. Finding a perfect | |
83 | // policy would require predicting the size of future writes, so we're just | |
84 | // going to always eschew syscalls if we have less than 2N bytes to write. | |
85 | ||
86 | // The case where we have to do two syscalls. | |
87 | // This case also covers the case where the buffer is empty, | |
88 | // but it is clearer (I think) to think of it as two separate cases. | |
89 | if ((have_bytes + len >= 2 * wBufSize_) || (have_bytes == 0)) { | |
90 | // TODO(dreiss): writev | |
91 | if (have_bytes > 0) { | |
92 | transport_->write(wBuf_.get(), have_bytes); | |
93 | } | |
94 | transport_->write(buf, len); | |
95 | wBase_ = wBuf_.get(); | |
96 | return; | |
97 | } | |
98 | ||
99 | // Fill up our internal buffer for a write. | |
100 | memcpy(wBase_, buf, space); | |
101 | buf += space; | |
102 | len -= space; | |
103 | transport_->write(wBuf_.get(), wBufSize_); | |
104 | ||
105 | // Copy the rest into our buffer. | |
106 | assert(len < wBufSize_); | |
107 | memcpy(wBuf_.get(), buf, len); | |
108 | wBase_ = wBuf_.get() + len; | |
109 | return; | |
110 | } | |
111 | ||
112 | const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { | |
113 | (void)buf; | |
114 | (void)len; | |
115 | // Simply return NULL. We don't know if there is actually data available on | |
116 | // the underlying transport, so calling read() might block. | |
117 | return nullptr; | |
118 | } | |
119 | ||
120 | void TBufferedTransport::flush() { | |
121 | // Write out any data waiting in the write buffer. | |
122 | auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get()); | |
123 | if (have_bytes > 0) { | |
124 | // Note that we reset wBase_ prior to the underlying write | |
125 | // to ensure we're in a sane state (i.e. internal buffer cleaned) | |
126 | // if the underlying write throws up an exception | |
127 | wBase_ = wBuf_.get(); | |
128 | transport_->write(wBuf_.get(), have_bytes); | |
129 | } | |
130 | ||
131 | // Flush the underlying transport. | |
132 | transport_->flush(); | |
133 | } | |
134 | ||
135 | uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { | |
136 | uint32_t want = len; | |
137 | auto have = static_cast<uint32_t>(rBound_ - rBase_); | |
138 | ||
139 | // We should only take the slow path if we can't satisfy the read | |
140 | // with the data already in the buffer. | |
141 | assert(have < want); | |
142 | ||
143 | // If we have some data in the buffer, copy it out and return it. | |
144 | // We have to return it without attempting to read more, since we aren't | |
145 | // guaranteed that the underlying transport actually has more data, so | |
146 | // attempting to read from it could block. | |
147 | if (have > 0) { | |
148 | memcpy(buf, rBase_, have); | |
149 | setReadBuffer(rBuf_.get(), 0); | |
150 | return have; | |
151 | } | |
152 | ||
153 | // Read another frame. | |
154 | if (!readFrame()) { | |
155 | // EOF. No frame available. | |
156 | return 0; | |
157 | } | |
158 | ||
159 | // TODO(dreiss): Should we warn when reads cross frames? | |
160 | ||
161 | // Hand over whatever we have. | |
162 | uint32_t give = (std::min)(want, static_cast<uint32_t>(rBound_ - rBase_)); | |
163 | memcpy(buf, rBase_, give); | |
164 | rBase_ += give; | |
165 | want -= give; | |
166 | ||
167 | return (len - want); | |
168 | } | |
169 | ||
170 | bool TFramedTransport::readFrame() { | |
171 | // TODO(dreiss): Think about using readv here, even though it would | |
172 | // result in (gasp) read-ahead. | |
173 | ||
174 | // Read the size of the next frame. | |
175 | // We can't use readAll(&sz, sizeof(sz)), since that always throws an | |
176 | // exception on EOF. We want to throw an exception only if EOF occurs after | |
177 | // partial size data. | |
178 | int32_t sz = -1; | |
179 | uint32_t size_bytes_read = 0; | |
180 | while (size_bytes_read < sizeof(sz)) { | |
181 | uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read; | |
182 | uint32_t bytes_read | |
183 | = transport_->read(szp, static_cast<uint32_t>(sizeof(sz)) - size_bytes_read); | |
184 | if (bytes_read == 0) { | |
185 | if (size_bytes_read == 0) { | |
186 | // EOF before any data was read. | |
187 | return false; | |
188 | } else { | |
189 | // EOF after a partial frame header. Raise an exception. | |
190 | throw TTransportException(TTransportException::END_OF_FILE, | |
191 | "No more data to read after " | |
192 | "partial frame header."); | |
193 | } | |
194 | } | |
195 | size_bytes_read += bytes_read; | |
196 | } | |
197 | ||
198 | sz = ntohl(sz); | |
199 | ||
200 | if (sz < 0) { | |
201 | throw TTransportException("Frame size has negative value"); | |
202 | } | |
203 | ||
204 | // Check for oversized frame | |
205 | if (sz > static_cast<int32_t>(maxFrameSize_)) | |
206 | throw TTransportException(TTransportException::CORRUPTED_DATA, "Received an oversized frame"); | |
207 | ||
208 | // Read the frame payload, and reset markers. | |
209 | if (sz > static_cast<int32_t>(rBufSize_)) { | |
210 | rBuf_.reset(new uint8_t[sz]); | |
211 | rBufSize_ = sz; | |
212 | } | |
213 | transport_->readAll(rBuf_.get(), sz); | |
214 | setReadBuffer(rBuf_.get(), sz); | |
215 | return true; | |
216 | } | |
217 | ||
218 | void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) { | |
219 | // Double buffer size until sufficient. | |
220 | auto have = static_cast<uint32_t>(wBase_ - wBuf_.get()); | |
221 | uint32_t new_size = wBufSize_; | |
222 | if (len + have < have /* overflow */ || len + have > 0x7fffffff) { | |
223 | throw TTransportException(TTransportException::BAD_ARGS, | |
224 | "Attempted to write over 2 GB to TFramedTransport."); | |
225 | } | |
226 | while (new_size < len + have) { | |
227 | new_size = new_size > 0 ? new_size * 2 : 1; | |
228 | } | |
229 | ||
230 | // TODO(dreiss): Consider modifying this class to use malloc/free | |
231 | // so we can use realloc here. | |
232 | ||
233 | // Allocate new buffer. | |
234 | auto* new_buf = new uint8_t[new_size]; | |
235 | ||
236 | // Copy the old buffer to the new one. | |
237 | memcpy(new_buf, wBuf_.get(), have); | |
238 | ||
239 | // Now point buf to the new one. | |
240 | wBuf_.reset(new_buf); | |
241 | wBufSize_ = new_size; | |
242 | wBase_ = wBuf_.get() + have; | |
243 | wBound_ = wBuf_.get() + wBufSize_; | |
244 | ||
245 | // Copy the data into the new buffer. | |
246 | memcpy(wBase_, buf, len); | |
247 | wBase_ += len; | |
248 | } | |
249 | ||
250 | void TFramedTransport::flush() { | |
251 | int32_t sz_hbo, sz_nbo; | |
252 | assert(wBufSize_ > sizeof(sz_nbo)); | |
253 | ||
254 | // Slip the frame size into the start of the buffer. | |
255 | sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo))); | |
256 | sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo)); | |
257 | memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo)); | |
258 | ||
259 | if (sz_hbo > 0) { | |
260 | // Note that we reset wBase_ (with a pad for the frame size) | |
261 | // prior to the underlying write to ensure we're in a sane state | |
262 | // (i.e. internal buffer cleaned) if the underlying write throws | |
263 | // up an exception | |
264 | wBase_ = wBuf_.get() + sizeof(sz_nbo); | |
265 | ||
266 | // Write size and frame body. | |
267 | transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo); | |
268 | } | |
269 | ||
270 | // Flush the underlying transport. | |
271 | transport_->flush(); | |
272 | ||
273 | // reclaim write buffer | |
274 | if (wBufSize_ > bufReclaimThresh_) { | |
275 | wBufSize_ = DEFAULT_BUFFER_SIZE; | |
276 | wBuf_.reset(new uint8_t[wBufSize_]); | |
277 | setWriteBuffer(wBuf_.get(), wBufSize_); | |
278 | ||
279 | // reset wBase_ with a pad for the frame size | |
280 | int32_t pad = 0; | |
281 | wBase_ = wBuf_.get() + sizeof(pad); | |
282 | } | |
283 | } | |
284 | ||
285 | uint32_t TFramedTransport::writeEnd() { | |
286 | return static_cast<uint32_t>(wBase_ - wBuf_.get()); | |
287 | } | |
288 | ||
289 | const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { | |
290 | (void)buf; | |
291 | (void)len; | |
292 | // Don't try to be clever with shifting buffers. | |
293 | // If the fast path failed let the protocol use its slow path. | |
294 | // Besides, who is going to try to borrow across messages? | |
295 | return nullptr; | |
296 | } | |
297 | ||
298 | uint32_t TFramedTransport::readEnd() { | |
299 | // include framing bytes | |
300 | auto bytes_read = static_cast<uint32_t>(rBound_ - rBuf_.get() + sizeof(uint32_t)); | |
301 | ||
302 | if (rBufSize_ > bufReclaimThresh_) { | |
303 | rBufSize_ = 0; | |
304 | rBuf_.reset(); | |
305 | setReadBuffer(rBuf_.get(), rBufSize_); | |
306 | } | |
307 | ||
308 | return bytes_read; | |
309 | } | |
310 | ||
311 | void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) { | |
312 | // Correct rBound_ so we can use the fast path in the future. | |
313 | rBound_ = wBase_; | |
314 | ||
315 | // Decide how much to give. | |
316 | uint32_t give = (std::min)(len, available_read()); | |
317 | ||
318 | *out_start = rBase_; | |
319 | *out_give = give; | |
320 | ||
321 | // Preincrement rBase_ so the caller doesn't have to. | |
322 | rBase_ += give; | |
323 | } | |
324 | ||
325 | uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) { | |
326 | uint8_t* start; | |
327 | uint32_t give; | |
328 | computeRead(len, &start, &give); | |
329 | ||
330 | // Copy into the provided buffer. | |
331 | memcpy(buf, start, give); | |
332 | ||
333 | return give; | |
334 | } | |
335 | ||
336 | uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) { | |
337 | // Don't get some stupid assertion failure. | |
338 | if (buffer_ == nullptr) { | |
339 | return 0; | |
340 | } | |
341 | ||
342 | uint8_t* start; | |
343 | uint32_t give; | |
344 | computeRead(len, &start, &give); | |
345 | ||
346 | // Append to the provided string. | |
347 | str.append((char*)start, give); | |
348 | ||
349 | return give; | |
350 | } | |
351 | ||
352 | void TMemoryBuffer::ensureCanWrite(uint32_t len) { | |
353 | // Check available space | |
354 | uint32_t avail = available_write(); | |
355 | if (len <= avail) { | |
356 | return; | |
357 | } | |
358 | ||
359 | if (!owner_) { | |
360 | throw TTransportException("Insufficient space in external MemoryBuffer"); | |
361 | } | |
362 | ||
363 | // Grow the buffer as necessary. | |
364 | uint64_t new_size = bufferSize_; | |
365 | while (len > avail) { | |
366 | new_size = new_size > 0 ? new_size * 2 : 1; | |
367 | if (new_size > maxBufferSize_) { | |
368 | throw TTransportException(TTransportException::BAD_ARGS, | |
369 | "Internal buffer size overflow"); | |
370 | } | |
371 | avail = available_write() + (static_cast<uint32_t>(new_size) - bufferSize_); | |
372 | } | |
373 | ||
374 | // Allocate into a new pointer so we don't bork ours if it fails. | |
375 | auto* new_buffer = static_cast<uint8_t*>(std::realloc(buffer_, new_size)); | |
376 | if (new_buffer == nullptr) { | |
377 | throw std::bad_alloc(); | |
378 | } | |
379 | ||
380 | rBase_ = new_buffer + (rBase_ - buffer_); | |
381 | rBound_ = new_buffer + (rBound_ - buffer_); | |
382 | wBase_ = new_buffer + (wBase_ - buffer_); | |
383 | wBound_ = new_buffer + new_size; | |
384 | buffer_ = new_buffer; | |
385 | bufferSize_ = static_cast<uint32_t>(new_size); | |
386 | } | |
387 | ||
388 | void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) { | |
389 | ensureCanWrite(len); | |
390 | ||
391 | // Copy into the buffer and increment wBase_. | |
392 | memcpy(wBase_, buf, len); | |
393 | wBase_ += len; | |
394 | } | |
395 | ||
396 | void TMemoryBuffer::wroteBytes(uint32_t len) { | |
397 | uint32_t avail = available_write(); | |
398 | if (len > avail) { | |
399 | throw TTransportException("Client wrote more bytes than size of buffer."); | |
400 | } | |
401 | wBase_ += len; | |
402 | } | |
403 | ||
404 | const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) { | |
405 | (void)buf; | |
406 | rBound_ = wBase_; | |
407 | if (available_read() >= *len) { | |
408 | *len = available_read(); | |
409 | return rBase_; | |
410 | } | |
411 | return nullptr; | |
412 | } | |
413 | } | |
414 | } | |
415 | } // apache::thrift::transport |