]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ | |
21 | #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 | |
22 | ||
23 | #include <cstdlib> | |
24 | #include <cstring> | |
25 | #include <limits> | |
26 | #include <boost/scoped_array.hpp> | |
27 | ||
28 | #include <thrift/transport/TTransport.h> | |
29 | #include <thrift/transport/TVirtualTransport.h> | |
30 | ||
31 | #ifdef __GNUC__ | |
32 | #define TDB_LIKELY(val) (__builtin_expect((val), 1)) | |
33 | #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) | |
34 | #else | |
35 | #define TDB_LIKELY(val) (val) | |
36 | #define TDB_UNLIKELY(val) (val) | |
37 | #endif | |
38 | ||
39 | namespace apache { | |
40 | namespace thrift { | |
41 | namespace transport { | |
42 | ||
43 | /** | |
44 | * Base class for all transports that use read/write buffers for performance. | |
45 | * | |
46 | * TBufferBase is designed to implement the fast-path "memcpy" style | |
47 | * operations that work in the common case. It does so with small and | |
48 | * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract | |
49 | * class. Subclasses are expected to define the "slow path" operations | |
50 | * that have to be done when the buffers are full or empty. | |
51 | * | |
52 | */ | |
53 | class TBufferBase : public TVirtualTransport<TBufferBase> { | |
54 | ||
55 | public: | |
56 | /** | |
57 | * Fast-path read. | |
58 | * | |
59 | * When we have enough data buffered to fulfill the read, we can satisfy it | |
60 | * with a single memcpy, then adjust our internal pointers. If the buffer | |
61 | * is empty, we call out to our slow path, implemented by a subclass. | |
62 | * This method is meant to eventually be nonvirtual and inlinable. | |
63 | */ | |
64 | uint32_t read(uint8_t* buf, uint32_t len) { | |
65 | uint8_t* new_rBase = rBase_ + len; | |
66 | if (TDB_LIKELY(new_rBase <= rBound_)) { | |
67 | std::memcpy(buf, rBase_, len); | |
68 | rBase_ = new_rBase; | |
69 | return len; | |
70 | } | |
71 | return readSlow(buf, len); | |
72 | } | |
73 | ||
74 | /** | |
75 | * Shortcutted version of readAll. | |
76 | */ | |
77 | uint32_t readAll(uint8_t* buf, uint32_t len) { | |
78 | uint8_t* new_rBase = rBase_ + len; | |
79 | if (TDB_LIKELY(new_rBase <= rBound_)) { | |
80 | std::memcpy(buf, rBase_, len); | |
81 | rBase_ = new_rBase; | |
82 | return len; | |
83 | } | |
84 | return apache::thrift::transport::readAll(*this, buf, len); | |
85 | } | |
86 | ||
87 | /** | |
88 | * Fast-path write. | |
89 | * | |
90 | * When we have enough empty space in our buffer to accommodate the write, we | |
91 | * can satisfy it with a single memcpy, then adjust our internal pointers. | |
92 | * If the buffer is full, we call out to our slow path, implemented by a | |
93 | * subclass. This method is meant to eventually be nonvirtual and | |
94 | * inlinable. | |
95 | */ | |
96 | void write(const uint8_t* buf, uint32_t len) { | |
97 | uint8_t* new_wBase = wBase_ + len; | |
98 | if (TDB_LIKELY(new_wBase <= wBound_)) { | |
99 | std::memcpy(wBase_, buf, len); | |
100 | wBase_ = new_wBase; | |
101 | return; | |
102 | } | |
103 | writeSlow(buf, len); | |
104 | } | |
105 | ||
106 | /** | |
107 | * Fast-path borrow. A lot like the fast-path read. | |
108 | */ | |
109 | const uint8_t* borrow(uint8_t* buf, uint32_t* len) { | |
110 | if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { | |
111 | // With strict aliasing, writing to len shouldn't force us to | |
112 | // refetch rBase_ from memory. TODO(dreiss): Verify this. | |
113 | *len = static_cast<uint32_t>(rBound_ - rBase_); | |
114 | return rBase_; | |
115 | } | |
116 | return borrowSlow(buf, len); | |
117 | } | |
118 | ||
119 | /** | |
120 | * Consume doesn't require a slow path. | |
121 | */ | |
122 | void consume(uint32_t len) { | |
123 | if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { | |
124 | rBase_ += len; | |
125 | } else { | |
126 | throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow."); | |
127 | } | |
128 | } | |
129 | ||
130 | protected: | |
131 | /// Slow path read. | |
132 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; | |
133 | ||
134 | /// Slow path write. | |
135 | virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; | |
136 | ||
137 | /** | |
138 | * Slow path borrow. | |
139 | * | |
140 | * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len | |
141 | */ | |
142 | virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; | |
143 | ||
144 | /** | |
145 | * Trivial constructor. | |
146 | * | |
147 | * Initialize pointers safely. Constructing is not a very | |
148 | * performance-sensitive operation, so it is okay to just leave it to | |
149 | * the concrete class to set up pointers correctly. | |
150 | */ | |
151 | TBufferBase() : rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {} | |
152 | ||
153 | /// Convenience mutator for setting the read buffer. | |
154 | void setReadBuffer(uint8_t* buf, uint32_t len) { | |
155 | rBase_ = buf; | |
156 | rBound_ = buf + len; | |
157 | } | |
158 | ||
159 | /// Convenience mutator for setting the write buffer. | |
160 | void setWriteBuffer(uint8_t* buf, uint32_t len) { | |
161 | wBase_ = buf; | |
162 | wBound_ = buf + len; | |
163 | } | |
164 | ||
165 | ~TBufferBase() override = default; | |
166 | ||
167 | /// Reads begin here. | |
168 | uint8_t* rBase_; | |
169 | /// Reads may extend to just before here. | |
170 | uint8_t* rBound_; | |
171 | ||
172 | /// Writes begin here. | |
173 | uint8_t* wBase_; | |
174 | /// Writes may extend to just before here. | |
175 | uint8_t* wBound_; | |
176 | }; | |
177 | ||
178 | /** | |
179 | * Buffered transport. For reads it will read more data than is requested | |
180 | * and will serve future data out of a local buffer. For writes, data is | |
181 | * stored to an in memory buffer before being written out. | |
182 | * | |
183 | */ | |
184 | class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> { | |
185 | public: | |
186 | static const int DEFAULT_BUFFER_SIZE = 512; | |
187 | ||
188 | /// Use default buffer sizes. | |
189 | TBufferedTransport(std::shared_ptr<TTransport> transport) | |
190 | : transport_(transport), | |
191 | rBufSize_(DEFAULT_BUFFER_SIZE), | |
192 | wBufSize_(DEFAULT_BUFFER_SIZE), | |
193 | rBuf_(new uint8_t[rBufSize_]), | |
194 | wBuf_(new uint8_t[wBufSize_]) { | |
195 | initPointers(); | |
196 | } | |
197 | ||
198 | /// Use specified buffer sizes. | |
199 | TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t sz) | |
200 | : transport_(transport), | |
201 | rBufSize_(sz), | |
202 | wBufSize_(sz), | |
203 | rBuf_(new uint8_t[rBufSize_]), | |
204 | wBuf_(new uint8_t[wBufSize_]) { | |
205 | initPointers(); | |
206 | } | |
207 | ||
208 | /// Use specified read and write buffer sizes. | |
209 | TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) | |
210 | : transport_(transport), | |
211 | rBufSize_(rsz), | |
212 | wBufSize_(wsz), | |
213 | rBuf_(new uint8_t[rBufSize_]), | |
214 | wBuf_(new uint8_t[wBufSize_]) { | |
215 | initPointers(); | |
216 | } | |
217 | ||
218 | void open() override { transport_->open(); } | |
219 | ||
220 | bool isOpen() const override { return transport_->isOpen(); } | |
221 | ||
222 | bool peek() override { | |
223 | if (rBase_ == rBound_) { | |
224 | setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); | |
225 | } | |
226 | return (rBound_ > rBase_); | |
227 | } | |
228 | ||
229 | void close() override { | |
230 | flush(); | |
231 | transport_->close(); | |
232 | } | |
233 | ||
234 | uint32_t readSlow(uint8_t* buf, uint32_t len) override; | |
235 | ||
236 | void writeSlow(const uint8_t* buf, uint32_t len) override; | |
237 | ||
238 | void flush() override; | |
239 | ||
240 | /** | |
241 | * Returns the origin of the underlying transport | |
242 | */ | |
243 | const std::string getOrigin() const override { return transport_->getOrigin(); } | |
244 | ||
245 | /** | |
246 | * The following behavior is currently implemented by TBufferedTransport, | |
247 | * but that may change in a future version: | |
248 | * 1/ If len is at most rBufSize_, borrow will never return NULL. | |
249 | * Depending on the underlying transport, it could throw an exception | |
250 | * or hang forever. | |
251 | * 2/ Some borrow requests may copy bytes internally. However, | |
252 | * if len is at most rBufSize_/2, none of the copied bytes | |
253 | * will ever have to be copied again. For optimial performance, | |
254 | * stay under this limit. | |
255 | */ | |
256 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; | |
257 | ||
258 | std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } | |
259 | ||
260 | /* | |
261 | * TVirtualTransport provides a default implementation of readAll(). | |
262 | * We want to use the TBufferBase version instead. | |
263 | */ | |
264 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } | |
265 | ||
266 | protected: | |
267 | void initPointers() { | |
268 | setReadBuffer(rBuf_.get(), 0); | |
269 | setWriteBuffer(wBuf_.get(), wBufSize_); | |
270 | // Write size never changes. | |
271 | } | |
272 | ||
273 | std::shared_ptr<TTransport> transport_; | |
274 | ||
275 | uint32_t rBufSize_; | |
276 | uint32_t wBufSize_; | |
277 | boost::scoped_array<uint8_t> rBuf_; | |
278 | boost::scoped_array<uint8_t> wBuf_; | |
279 | }; | |
280 | ||
281 | /** | |
282 | * Wraps a transport into a buffered one. | |
283 | * | |
284 | */ | |
285 | class TBufferedTransportFactory : public TTransportFactory { | |
286 | public: | |
287 | TBufferedTransportFactory() = default; | |
288 | ||
289 | ~TBufferedTransportFactory() override = default; | |
290 | ||
291 | /** | |
292 | * Wraps the transport into a buffered one. | |
293 | */ | |
294 | std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { | |
295 | return std::shared_ptr<TTransport>(new TBufferedTransport(trans)); | |
296 | } | |
297 | }; | |
298 | ||
299 | /** | |
300 | * Framed transport. All writes go into an in-memory buffer until flush is | |
301 | * called, at which point the transport writes the length of the entire | |
302 | * binary chunk followed by the data payload. This allows the receiver on the | |
303 | * other end to always do fixed-length reads. | |
304 | * | |
305 | */ | |
306 | class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> { | |
307 | public: | |
308 | static const int DEFAULT_BUFFER_SIZE = 512; | |
309 | static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; | |
310 | ||
311 | /// Use default buffer sizes. | |
312 | TFramedTransport() | |
313 | : transport_(), | |
314 | rBufSize_(0), | |
315 | wBufSize_(DEFAULT_BUFFER_SIZE), | |
316 | rBuf_(), | |
317 | wBuf_(new uint8_t[wBufSize_]), | |
318 | bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) { | |
319 | initPointers(); | |
320 | } | |
321 | ||
322 | TFramedTransport(std::shared_ptr<TTransport> transport) | |
323 | : transport_(transport), | |
324 | rBufSize_(0), | |
325 | wBufSize_(DEFAULT_BUFFER_SIZE), | |
326 | rBuf_(), | |
327 | wBuf_(new uint8_t[wBufSize_]), | |
328 | bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()), | |
329 | maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) { | |
330 | initPointers(); | |
331 | } | |
332 | ||
333 | TFramedTransport(std::shared_ptr<TTransport> transport, | |
334 | uint32_t sz, | |
335 | uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)()) | |
336 | : transport_(transport), | |
337 | rBufSize_(0), | |
338 | wBufSize_(sz), | |
339 | rBuf_(), | |
340 | wBuf_(new uint8_t[wBufSize_]), | |
341 | bufReclaimThresh_(bufReclaimThresh), | |
342 | maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) { | |
343 | initPointers(); | |
344 | } | |
345 | ||
346 | void open() override { transport_->open(); } | |
347 | ||
348 | bool isOpen() const override { return transport_->isOpen(); } | |
349 | ||
350 | bool peek() override { return (rBase_ < rBound_) || transport_->peek(); } | |
351 | ||
352 | void close() override { | |
353 | flush(); | |
354 | transport_->close(); | |
355 | } | |
356 | ||
357 | uint32_t readSlow(uint8_t* buf, uint32_t len) override; | |
358 | ||
359 | void writeSlow(const uint8_t* buf, uint32_t len) override; | |
360 | ||
361 | void flush() override; | |
362 | ||
363 | uint32_t readEnd() override; | |
364 | ||
365 | uint32_t writeEnd() override; | |
366 | ||
367 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; | |
368 | ||
369 | std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } | |
370 | ||
371 | /* | |
372 | * TVirtualTransport provides a default implementation of readAll(). | |
373 | * We want to use the TBufferBase version instead. | |
374 | */ | |
375 | using TBufferBase::readAll; | |
376 | ||
377 | /** | |
378 | * Returns the origin of the underlying transport | |
379 | */ | |
380 | const std::string getOrigin() const override { return transport_->getOrigin(); } | |
381 | ||
382 | /** | |
383 | * Set the maximum size of the frame at read | |
384 | */ | |
385 | void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; } | |
386 | ||
387 | /** | |
388 | * Get the maximum size of the frame at read | |
389 | */ | |
390 | uint32_t getMaxFrameSize() { return maxFrameSize_; } | |
391 | ||
392 | protected: | |
393 | /** | |
394 | * Reads a frame of input from the underlying stream. | |
395 | * | |
396 | * Returns true if a frame was read successfully, or false on EOF. | |
397 | * (Raises a TTransportException if EOF occurs after a partial frame.) | |
398 | */ | |
399 | virtual bool readFrame(); | |
400 | ||
401 | void initPointers() { | |
402 | setReadBuffer(nullptr, 0); | |
403 | setWriteBuffer(wBuf_.get(), wBufSize_); | |
404 | ||
405 | // Pad the buffer so we can insert the size later. | |
406 | int32_t pad = 0; | |
407 | this->write((uint8_t*)&pad, sizeof(pad)); | |
408 | } | |
409 | ||
410 | std::shared_ptr<TTransport> transport_; | |
411 | ||
412 | uint32_t rBufSize_; | |
413 | uint32_t wBufSize_; | |
414 | boost::scoped_array<uint8_t> rBuf_; | |
415 | boost::scoped_array<uint8_t> wBuf_; | |
416 | uint32_t bufReclaimThresh_; | |
417 | uint32_t maxFrameSize_; | |
418 | }; | |
419 | ||
420 | /** | |
421 | * Wraps a transport into a framed one. | |
422 | * | |
423 | */ | |
424 | class TFramedTransportFactory : public TTransportFactory { | |
425 | public: | |
426 | TFramedTransportFactory() = default; | |
427 | ||
428 | ~TFramedTransportFactory() override = default; | |
429 | ||
430 | /** | |
431 | * Wraps the transport into a framed one. | |
432 | */ | |
433 | std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { | |
434 | return std::shared_ptr<TTransport>(new TFramedTransport(trans)); | |
435 | } | |
436 | }; | |
437 | ||
438 | /** | |
439 | * A memory buffer is a tranpsort that simply reads from and writes to an | |
440 | * in memory buffer. Anytime you call write on it, the data is simply placed | |
441 | * into a buffer, and anytime you call read, data is read from that buffer. | |
442 | * | |
443 | * The buffers are allocated using C constructs malloc,realloc, and the size | |
444 | * doubles as necessary. We've considered using scoped | |
445 | * | |
446 | */ | |
447 | class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { | |
448 | private: | |
449 | // Common initialization done by all constructors. | |
450 | void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { | |
451 | ||
452 | maxBufferSize_ = (std::numeric_limits<uint32_t>::max)(); | |
453 | ||
454 | if (buf == nullptr && size != 0) { | |
455 | assert(owner); | |
456 | buf = (uint8_t*)std::malloc(size); | |
457 | if (buf == nullptr) { | |
458 | throw std::bad_alloc(); | |
459 | } | |
460 | } | |
461 | ||
462 | buffer_ = buf; | |
463 | bufferSize_ = size; | |
464 | ||
465 | rBase_ = buffer_; | |
466 | rBound_ = buffer_ + wPos; | |
467 | // TODO(dreiss): Investigate NULL-ing this if !owner. | |
468 | wBase_ = buffer_ + wPos; | |
469 | wBound_ = buffer_ + bufferSize_; | |
470 | ||
471 | owner_ = owner; | |
472 | ||
473 | // rBound_ is really an artifact. In principle, it should always be | |
474 | // equal to wBase_. We update it in a few places (computeRead, etc.). | |
475 | } | |
476 | ||
477 | public: | |
478 | static const uint32_t defaultSize = 1024; | |
479 | ||
480 | /** | |
481 | * This enum specifies how a TMemoryBuffer should treat | |
482 | * memory passed to it via constructors or resetBuffer. | |
483 | * | |
484 | * OBSERVE: | |
485 | * TMemoryBuffer will simply store a pointer to the memory. | |
486 | * It is the callers responsibility to ensure that the pointer | |
487 | * remains valid for the lifetime of the TMemoryBuffer, | |
488 | * and that it is properly cleaned up. | |
489 | * Note that no data can be written to observed buffers. | |
490 | * | |
491 | * COPY: | |
492 | * TMemoryBuffer will make an internal copy of the buffer. | |
493 | * The caller has no responsibilities. | |
494 | * | |
495 | * TAKE_OWNERSHIP: | |
496 | * TMemoryBuffer will become the "owner" of the buffer, | |
497 | * and will be responsible for freeing it. | |
498 | * The membory must have been allocated with malloc. | |
499 | */ | |
500 | enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 }; | |
501 | ||
502 | /** | |
503 | * Construct a TMemoryBuffer with a default-sized buffer, | |
504 | * owned by the TMemoryBuffer object. | |
505 | */ | |
506 | TMemoryBuffer() { initCommon(nullptr, defaultSize, true, 0); } | |
507 | ||
508 | /** | |
509 | * Construct a TMemoryBuffer with a buffer of a specified size, | |
510 | * owned by the TMemoryBuffer object. | |
511 | * | |
512 | * @param sz The initial size of the buffer. | |
513 | */ | |
514 | TMemoryBuffer(uint32_t sz) { initCommon(nullptr, sz, true, 0); } | |
515 | ||
516 | /** | |
517 | * Construct a TMemoryBuffer with buf as its initial contents. | |
518 | * | |
519 | * @param buf The initial contents of the buffer. | |
520 | * Note that, while buf is a non-const pointer, | |
521 | * TMemoryBuffer will not write to it if policy == OBSERVE, | |
522 | * so it is safe to const_cast<uint8_t*>(whatever). | |
523 | * @param sz The size of @c buf. | |
524 | * @param policy See @link MemoryPolicy @endlink . | |
525 | */ | |
526 | TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { | |
527 | if (buf == nullptr && sz != 0) { | |
528 | throw TTransportException(TTransportException::BAD_ARGS, | |
529 | "TMemoryBuffer given null buffer with non-zero size."); | |
530 | } | |
531 | ||
532 | switch (policy) { | |
533 | case OBSERVE: | |
534 | case TAKE_OWNERSHIP: | |
535 | initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); | |
536 | break; | |
537 | case COPY: | |
538 | initCommon(nullptr, sz, true, 0); | |
539 | this->write(buf, sz); | |
540 | break; | |
541 | default: | |
542 | throw TTransportException(TTransportException::BAD_ARGS, | |
543 | "Invalid MemoryPolicy for TMemoryBuffer"); | |
544 | } | |
545 | } | |
546 | ||
547 | ~TMemoryBuffer() override { | |
548 | if (owner_) { | |
549 | std::free(buffer_); | |
550 | } | |
551 | } | |
552 | ||
553 | bool isOpen() const override { return true; } | |
554 | ||
555 | bool peek() override { return (rBase_ < wBase_); } | |
556 | ||
557 | void open() override {} | |
558 | ||
559 | void close() override {} | |
560 | ||
561 | // TODO(dreiss): Make bufPtr const. | |
562 | void getBuffer(uint8_t** bufPtr, uint32_t* sz) { | |
563 | *bufPtr = rBase_; | |
564 | *sz = static_cast<uint32_t>(wBase_ - rBase_); | |
565 | } | |
566 | ||
567 | std::string getBufferAsString() { | |
568 | if (buffer_ == nullptr) { | |
569 | return ""; | |
570 | } | |
571 | uint8_t* buf; | |
572 | uint32_t sz; | |
573 | getBuffer(&buf, &sz); | |
574 | return std::string((char*)buf, (std::string::size_type)sz); | |
575 | } | |
576 | ||
577 | void appendBufferToString(std::string& str) { | |
578 | if (buffer_ == nullptr) { | |
579 | return; | |
580 | } | |
581 | uint8_t* buf; | |
582 | uint32_t sz; | |
583 | getBuffer(&buf, &sz); | |
584 | str.append((char*)buf, sz); | |
585 | } | |
586 | ||
587 | void resetBuffer() { | |
588 | rBase_ = buffer_; | |
589 | rBound_ = buffer_; | |
590 | wBase_ = buffer_; | |
591 | // It isn't safe to write into a buffer we don't own. | |
592 | if (!owner_) { | |
593 | wBound_ = wBase_; | |
594 | bufferSize_ = 0; | |
595 | } | |
596 | } | |
597 | ||
598 | /// See constructor documentation. | |
599 | void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { | |
600 | // Use a variant of the copy-and-swap trick for assignment operators. | |
601 | // This is sub-optimal in terms of performance for two reasons: | |
602 | // 1/ The constructing and swapping of the (small) values | |
603 | // in the temporary object takes some time, and is not necessary. | |
604 | // 2/ If policy == COPY, we allocate the new buffer before | |
605 | // freeing the old one, precluding the possibility of | |
606 | // reusing that memory. | |
607 | // I doubt that either of these problems could be optimized away, | |
608 | // but the second is probably no a common case, and the first is minor. | |
609 | // I don't expect resetBuffer to be a common operation, so I'm willing to | |
610 | // bite the performance bullet to make the method this simple. | |
611 | ||
612 | // Construct the new buffer. | |
613 | TMemoryBuffer new_buffer(buf, sz, policy); | |
614 | // Move it into ourself. | |
615 | this->swap(new_buffer); | |
616 | // Our old self gets destroyed. | |
617 | } | |
618 | ||
619 | /// See constructor documentation. | |
620 | void resetBuffer(uint32_t sz) { | |
621 | // Construct the new buffer. | |
622 | TMemoryBuffer new_buffer(sz); | |
623 | // Move it into ourself. | |
624 | this->swap(new_buffer); | |
625 | // Our old self gets destroyed. | |
626 | } | |
627 | ||
628 | std::string readAsString(uint32_t len) { | |
629 | std::string str; | |
630 | (void)readAppendToString(str, len); | |
631 | return str; | |
632 | } | |
633 | ||
634 | uint32_t readAppendToString(std::string& str, uint32_t len); | |
635 | ||
636 | // return number of bytes read | |
637 | uint32_t readEnd() override { | |
638 | // This cast should be safe, because buffer_'s size is a uint32_t | |
639 | auto bytes = static_cast<uint32_t>(rBase_ - buffer_); | |
640 | if (rBase_ == wBase_) { | |
641 | resetBuffer(); | |
642 | } | |
643 | return bytes; | |
644 | } | |
645 | ||
646 | // Return number of bytes written | |
647 | uint32_t writeEnd() override { | |
648 | // This cast should be safe, because buffer_'s size is a uint32_t | |
649 | return static_cast<uint32_t>(wBase_ - buffer_); | |
650 | } | |
651 | ||
652 | uint32_t available_read() const { | |
653 | // Remember, wBase_ is the real rBound_. | |
654 | return static_cast<uint32_t>(wBase_ - rBase_); | |
655 | } | |
656 | ||
657 | uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); } | |
658 | ||
659 | // Returns a pointer to where the client can write data to append to | |
660 | // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a | |
661 | // write of the provided length. The returned pointer is very convenient for | |
662 | // passing to read(), recv(), or similar. You must call wroteBytes() as soon | |
663 | // as data is written or the buffer will not be aware that data has changed. | |
664 | uint8_t* getWritePtr(uint32_t len) { | |
665 | ensureCanWrite(len); | |
666 | return wBase_; | |
667 | } | |
668 | ||
669 | // Informs the buffer that the client has written 'len' bytes into storage | |
670 | // that had been provided by getWritePtr(). | |
671 | void wroteBytes(uint32_t len); | |
672 | ||
673 | /* | |
674 | * TVirtualTransport provides a default implementation of readAll(). | |
675 | * We want to use the TBufferBase version instead. | |
676 | */ | |
677 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } | |
678 | ||
679 | //! \brief Get the current buffer size | |
680 | //! \returns the current buffer size | |
681 | uint32_t getBufferSize() const { | |
682 | return bufferSize_; | |
683 | } | |
684 | ||
685 | //! \brief Get the current maximum buffer size | |
686 | //! \returns the current maximum buffer size | |
687 | uint32_t getMaxBufferSize() const { | |
688 | return maxBufferSize_; | |
689 | } | |
690 | ||
691 | //! \brief Change the maximum buffer size | |
692 | //! \param[in] maxSize the new maximum buffer size allowed to grow to | |
693 | //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size | |
694 | void setMaxBufferSize(uint32_t maxSize) { | |
695 | if (maxSize < bufferSize_) { | |
696 | throw TTransportException(TTransportException::BAD_ARGS, | |
697 | "Maximum buffer size would be less than current buffer size"); | |
698 | } | |
699 | maxBufferSize_ = maxSize; | |
700 | } | |
701 | ||
702 | protected: | |
703 | void swap(TMemoryBuffer& that) { | |
704 | using std::swap; | |
705 | swap(buffer_, that.buffer_); | |
706 | swap(bufferSize_, that.bufferSize_); | |
707 | ||
708 | swap(rBase_, that.rBase_); | |
709 | swap(rBound_, that.rBound_); | |
710 | swap(wBase_, that.wBase_); | |
711 | swap(wBound_, that.wBound_); | |
712 | ||
713 | swap(owner_, that.owner_); | |
714 | } | |
715 | ||
716 | // Make sure there's at least 'len' bytes available for writing. | |
717 | void ensureCanWrite(uint32_t len); | |
718 | ||
719 | // Compute the position and available data for reading. | |
720 | void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); | |
721 | ||
722 | uint32_t readSlow(uint8_t* buf, uint32_t len) override; | |
723 | ||
724 | void writeSlow(const uint8_t* buf, uint32_t len) override; | |
725 | ||
726 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; | |
727 | ||
728 | // Data buffer | |
729 | uint8_t* buffer_; | |
730 | ||
731 | // Allocated buffer size | |
732 | uint32_t bufferSize_; | |
733 | ||
734 | // Maximum allowed size | |
735 | uint32_t maxBufferSize_; | |
736 | ||
737 | // Is this object the owner of the buffer? | |
738 | bool owner_; | |
739 | ||
740 | // Don't forget to update constrctors, initCommon, and swap if | |
741 | // you add new members. | |
742 | }; | |
743 | } | |
744 | } | |
745 | } // apache::thrift::transport | |
746 | ||
747 | #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |