1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 use byteorder
::{BigEndian, ReadBytesExt, WriteBytesExt}
;
21 use std
::io
::{Read, Write}
;
23 use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory}
;
25 /// Default capacity of the read buffer in bytes.
26 const READ_CAPACITY
: usize = 4096;
28 /// Default capacity of the write buffer in bytes.
29 const WRITE_CAPACITY
: usize = 4096;
31 /// Transport that reads framed messages.
33 /// A `TFramedReadTransport` maintains a fixed-size internal read buffer.
34 /// On a call to `TFramedReadTransport::read(...)` one full message - both
35 /// fixed-length header and bytes - is read from the wrapped channel and
36 /// buffered. Subsequent read calls are serviced from the internal buffer
37 /// until it is exhausted, at which point the next full message is read
38 /// from the wrapped channel.
42 /// Create and use a `TFramedReadTransport`.
45 /// use std::io::Read;
46 /// use thrift::transport::{TFramedReadTransport, TTcpChannel};
48 /// let mut c = TTcpChannel::new();
49 /// c.open("localhost:9090").unwrap();
51 /// let mut t = TFramedReadTransport::new(c);
53 /// t.read(&mut vec![0u8; 1]).unwrap();
56 pub struct TFramedReadTransport
<C
>
66 impl<C
> TFramedReadTransport
<C
>
70 /// Create a `TFramedReadTransport` with a default-sized
71 /// internal read buffer that wraps the given `TIoChannel`.
72 pub fn new(channel
: C
) -> TFramedReadTransport
<C
> {
73 TFramedReadTransport
::with_capacity(READ_CAPACITY
, channel
)
76 /// Create a `TFramedTransport` with an internal read buffer
77 /// of size `read_capacity` that wraps the given `TIoChannel`.
78 pub fn with_capacity(read_capacity
: usize, channel
: C
) -> TFramedReadTransport
<C
> {
79 TFramedReadTransport
{
80 buf
: vec
![0; read_capacity
], // FIXME: do I actually have to do this?
88 impl<C
> Read
for TFramedReadTransport
<C
>
92 fn read(&mut self, b
: &mut [u8]) -> io
::Result
<usize> {
93 if self.cap
- self.pos
== 0 {
94 let message_size
= self.chan
.read_i32
::<BigEndian
>()?
as usize;
96 let buf_capacity
= cmp
::max(message_size
, READ_CAPACITY
);
97 self.buf
.resize(buf_capacity
, 0);
99 self.chan
.read_exact(&mut self.buf
[..message_size
])?
;
100 self.cap
= message_size
as usize;
104 let nread
= cmp
::min(b
.len(), self.cap
- self.pos
);
105 b
[..nread
].clone_from_slice(&self.buf
[self.pos
..self.pos
+ nread
]);
112 /// Factory for creating instances of `TFramedReadTransport`.
114 pub struct TFramedReadTransportFactory
;
116 impl TFramedReadTransportFactory
{
117 pub fn new() -> TFramedReadTransportFactory
{
118 TFramedReadTransportFactory {}
122 impl TReadTransportFactory
for TFramedReadTransportFactory
{
123 /// Create a `TFramedReadTransport`.
124 fn create(&self, channel
: Box
<dyn Read
+ Send
>) -> Box
<dyn TReadTransport
+ Send
> {
125 Box
::new(TFramedReadTransport
::new(channel
))
129 /// Transport that writes framed messages.
131 /// A `TFramedWriteTransport` maintains a fixed-size internal write buffer. All
132 /// writes are made to this buffer and are sent to the wrapped channel only
133 /// when `TFramedWriteTransport::flush()` is called. On a flush a fixed-length
134 /// header with a count of the buffered bytes is written, followed by the bytes
139 /// Create and use a `TFramedWriteTransport`.
142 /// use std::io::Write;
143 /// use thrift::transport::{TFramedWriteTransport, TTcpChannel};
145 /// let mut c = TTcpChannel::new();
146 /// c.open("localhost:9090").unwrap();
148 /// let mut t = TFramedWriteTransport::new(c);
150 /// t.write(&[0x00]).unwrap();
151 /// t.flush().unwrap();
154 pub struct TFramedWriteTransport
<C
>
162 impl<C
> TFramedWriteTransport
<C
>
166 /// Create a `TFramedWriteTransport` with default-sized internal
167 /// write buffer that wraps the given `TIoChannel`.
168 pub fn new(channel
: C
) -> TFramedWriteTransport
<C
> {
169 TFramedWriteTransport
::with_capacity(WRITE_CAPACITY
, channel
)
172 /// Create a `TFramedWriteTransport` with an internal write buffer
173 /// of size `write_capacity` that wraps the given `TIoChannel`.
174 pub fn with_capacity(write_capacity
: usize, channel
: C
) -> TFramedWriteTransport
<C
> {
175 TFramedWriteTransport
{
176 buf
: Vec
::with_capacity(write_capacity
),
182 impl<C
> Write
for TFramedWriteTransport
<C
>
186 fn write(&mut self, b
: &[u8]) -> io
::Result
<usize> {
187 let current_capacity
= self.buf
.capacity();
188 let available_space
= current_capacity
- self.buf
.len();
189 if b
.len() > available_space
{
190 let additional_space
= cmp
::max(b
.len() - available_space
, current_capacity
);
191 self.buf
.reserve(additional_space
);
194 self.buf
.extend_from_slice(b
);
198 fn flush(&mut self) -> io
::Result
<()> {
199 let message_size
= self.buf
.len();
201 if let 0 = message_size
{
204 self.channel
.write_i32
::<BigEndian
>(message_size
as i32)?
;
207 // will spin if the underlying channel can't be written to
208 let mut byte_index
= 0;
209 while byte_index
< message_size
{
210 let nwrite
= self.channel
.write(&self.buf
[byte_index
..message_size
])?
;
211 byte_index
= cmp
::min(byte_index
+ nwrite
, message_size
);
214 let buf_capacity
= cmp
::min(self.buf
.capacity(), WRITE_CAPACITY
);
215 self.buf
.resize(buf_capacity
, 0);
222 /// Factory for creating instances of `TFramedWriteTransport`.
224 pub struct TFramedWriteTransportFactory
;
226 impl TFramedWriteTransportFactory
{
227 pub fn new() -> TFramedWriteTransportFactory
{
228 TFramedWriteTransportFactory {}
232 impl TWriteTransportFactory
for TFramedWriteTransportFactory
{
233 /// Create a `TFramedWriteTransport`.
234 fn create(&self, channel
: Box
<dyn Write
+ Send
>) -> Box
<dyn TWriteTransport
+ Send
> {
235 Box
::new(TFramedWriteTransport
::new(channel
))
242 use transport
::mem
::TBufferChannel
;
244 // FIXME: test a forced reserve
247 fn must_read_message_smaller_than_initial_buffer_size() {
248 let c
= TBufferChannel
::with_capacity(10, 10);
249 let mut t
= TFramedReadTransport
::with_capacity(8, c
);
251 t
.chan
.set_readable_bytes(&[
252 0x00, 0x00, 0x00, 0x04, /* message size */
253 0x00, 0x01, 0x02, 0x03, /* message body */
256 let mut buf
= vec
![0; 8];
258 // we've read exactly 4 bytes
259 assert_eq
!(t
.read(&mut buf
).unwrap(), 4);
260 assert_eq
!(&buf
[..4], &[0x00, 0x01, 0x02, 0x03]);
264 fn must_read_message_greater_than_initial_buffer_size() {
265 let c
= TBufferChannel
::with_capacity(10, 10);
266 let mut t
= TFramedReadTransport
::with_capacity(2, c
);
268 t
.chan
.set_readable_bytes(&[
269 0x00, 0x00, 0x00, 0x04, /* message size */
270 0x00, 0x01, 0x02, 0x03, /* message body */
273 let mut buf
= vec
![0; 8];
275 // we've read exactly 4 bytes
276 assert_eq
!(t
.read(&mut buf
).unwrap(), 4);
277 assert_eq
!(&buf
[..4], &[0x00, 0x01, 0x02, 0x03]);
281 fn must_read_multiple_messages_in_sequence_correctly() {
282 let c
= TBufferChannel
::with_capacity(10, 10);
283 let mut t
= TFramedReadTransport
::with_capacity(2, c
);
289 t
.chan
.set_readable_bytes(&[
290 0x00, 0x00, 0x00, 0x04, /* message size */
291 0x00, 0x01, 0x02, 0x03, /* message body */
294 let mut buf
= vec
![0; 8];
296 // we've read exactly 4 bytes
297 assert_eq
!(t
.read(&mut buf
).unwrap(), 4);
298 assert_eq
!(&buf
, &[0x00, 0x01, 0x02, 0x03, 0x00, 0x00, 0x00, 0x00]);
304 t
.chan
.set_readable_bytes(&[
305 0x00, 0x00, 0x00, 0x01, /* message size */
306 0x04, /* message body */
309 let mut buf
= vec
![0; 8];
311 // we've read exactly 1 byte
312 assert_eq
!(t
.read(&mut buf
).unwrap(), 1);
313 assert_eq
!(&buf
, &[0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
317 fn must_write_message_smaller_than_buffer_size() {
318 let mem
= TBufferChannel
::with_capacity(0, 0);
319 let mut t
= TFramedWriteTransport
::with_capacity(20, mem
);
323 // should have written 10 bytes
324 assert_eq
!(t
.write(&b
).unwrap(), 10);
328 fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
329 let mem
= TBufferChannel
::with_capacity(0, 10);
330 let mut t
= TFramedWriteTransport
::with_capacity(10, mem
);
332 let expected
: [u8; 0] = [];
334 assert_eq
!(t
.write(&[]).unwrap(), 0);
335 assert_eq_transport_written_bytes
!(t
, expected
);
339 fn must_write_to_inner_transport_on_flush() {
340 let mem
= TBufferChannel
::with_capacity(10, 10);
341 let mut t
= TFramedWriteTransport
::new(mem
);
343 let b
: [u8; 5] = [0x00, 0x01, 0x02, 0x03, 0x04];
344 assert_eq
!(t
.write(&b
).unwrap(), 5);
345 assert_eq_transport_num_written_bytes
!(t
, 0);
347 assert
!(t
.flush().is_ok());
349 let expected_bytes
= [
350 0x00, 0x00, 0x00, 0x05, /* message size */
351 0x00, 0x01, 0x02, 0x03, 0x04, /* message body */
354 assert_eq_transport_written_bytes
!(t
, expected_bytes
);
358 fn must_write_message_greater_than_buffer_size_00() {
359 let mem
= TBufferChannel
::with_capacity(0, 10);
361 // IMPORTANT: DO **NOT** CHANGE THE WRITE_CAPACITY OR THE NUMBER OF BYTES TO BE WRITTEN!
362 // these lengths were chosen to be just long enough
363 // that doubling the capacity is a **worse** choice than
364 // simply resizing the buffer to b.len()
366 let mut t
= TFramedWriteTransport
::with_capacity(1, mem
);
367 let b
= [0x00, 0x01, 0x02];
369 // should have written 3 bytes
370 assert_eq
!(t
.write(&b
).unwrap(), 3);
371 assert_eq_transport_num_written_bytes
!(t
, 0);
373 assert
!(t
.flush().is_ok());
375 let expected_bytes
= [
376 0x00, 0x00, 0x00, 0x03, /* message size */
377 0x00, 0x01, 0x02, /* message body */
380 assert_eq_transport_written_bytes
!(t
, expected_bytes
);
384 fn must_write_message_greater_than_buffer_size_01() {
385 let mem
= TBufferChannel
::with_capacity(0, 10);
387 // IMPORTANT: DO **NOT** CHANGE THE WRITE_CAPACITY OR THE NUMBER OF BYTES TO BE WRITTEN!
388 // these lengths were chosen to be just long enough
389 // that doubling the capacity is a **better** choice than
390 // simply resizing the buffer to b.len()
392 let mut t
= TFramedWriteTransport
::with_capacity(2, mem
);
393 let b
= [0x00, 0x01, 0x02];
395 // should have written 3 bytes
396 assert_eq
!(t
.write(&b
).unwrap(), 3);
397 assert_eq_transport_num_written_bytes
!(t
, 0);
399 assert
!(t
.flush().is_ok());
401 let expected_bytes
= [
402 0x00, 0x00, 0x00, 0x03, /* message size */
403 0x00, 0x01, 0x02, /* message body */
406 assert_eq_transport_written_bytes
!(t
, expected_bytes
);
410 fn must_return_error_if_nothing_can_be_written_to_inner_transport_on_flush() {
411 let mem
= TBufferChannel
::with_capacity(0, 0);
412 let mut t
= TFramedWriteTransport
::with_capacity(1, mem
);
416 // should have written 10 bytes
417 assert_eq
!(t
.write(&b
).unwrap(), 10);
422 // this time we'll error out because the flush can't write to the underlying channel
427 fn must_write_successfully_after_flush() {
428 // IMPORTANT: write capacity *MUST* be greater
429 // than message sizes used in this test + 4-byte frame header
430 let mem
= TBufferChannel
::with_capacity(0, 10);
431 let mut t
= TFramedWriteTransport
::with_capacity(5, mem
);
434 let first_message
: [u8; 5] = [0x00, 0x01, 0x02, 0x03, 0x04];
435 assert_eq
!(t
.write(&first_message
).unwrap(), 5);
436 assert
!(t
.flush().is_ok());
438 let mut expected
= Vec
::new();
439 expected
.write_all(&[0x00, 0x00, 0x00, 0x05]).unwrap(); // message size
440 expected
.extend_from_slice(&first_message
);
442 // check the flushed bytes
443 assert_eq
!(t
.channel
.write_bytes(), expected
);
445 // reset our underlying transport
446 t
.channel
.empty_write_buffer();
448 let second_message
: [u8; 3] = [0x05, 0x06, 0x07];
449 assert_eq
!(t
.write(&second_message
).unwrap(), 3);
450 assert
!(t
.flush().is_ok());
453 expected
.write_all(&[0x00, 0x00, 0x00, 0x03]).unwrap(); // message size
454 expected
.extend_from_slice(&second_message
);
456 // check the flushed bytes
457 assert_eq
!(t
.channel
.write_bytes(), expected
);