1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 use std
::net
::{TcpListener, TcpStream}
;
20 use threadpool
::ThreadPool
;
22 use protocol
::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory}
;
23 use transport
::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory}
;
24 use {ApplicationError, ApplicationErrorKind}
;
26 use super::TProcessor
;
28 /// Fixed-size thread-pool blocking Thrift server.
30 /// A `TServer` listens on a given address and submits accepted connections
31 /// to an **unbounded** queue. Connections from this queue are serviced by
32 /// the first available worker thread from a **fixed-size** thread pool. Each
33 /// accepted connection is handled by that worker thread, and communication
34 /// over this thread occurs sequentially and synchronously (i.e. calls block).
35 /// Accepted connections have an input half and an output half, each of which
36 /// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
37 /// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
38 /// and `TTransport` may be used.
42 /// Creating and running a `TServer` using Thrift-compiler-generated
46 /// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
47 /// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
48 /// use thrift::protocol::{TInputProtocol, TOutputProtocol};
49 /// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
50 /// TReadTransportFactory, TWriteTransportFactory};
51 /// use thrift::server::{TProcessor, TServer};
57 /// // processor for `SimpleService`
58 /// struct SimpleServiceSyncProcessor;
59 /// impl SimpleServiceSyncProcessor {
60 /// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
65 /// // `TProcessor` implementation for `SimpleService`
66 /// impl TProcessor for SimpleServiceSyncProcessor {
67 /// fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
72 /// // service functions for SimpleService
73 /// trait SimpleServiceSyncHandler {
74 /// fn service_call(&self) -> thrift::Result<()>;
78 /// // user-code follows
81 /// // define a handler that will be invoked when `service_call` is received
82 /// struct SimpleServiceHandlerImpl;
83 /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
84 /// fn service_call(&self) -> thrift::Result<()> {
89 /// // instantiate the processor
90 /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
92 /// // instantiate the server
93 /// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
94 /// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
95 /// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
96 /// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
98 /// let mut server = TServer::new(
107 /// // start listening for incoming connections
108 /// match server.listen("127.0.0.1:8080") {
109 /// Ok(_) => println!("listen completed"),
110 /// Err(e) => println!("listen failed with error {:?}", e),
114 pub struct TServer
<PRC
, RTF
, IPF
, WTF
, OPF
>
116 PRC
: TProcessor
+ Send
+ Sync
+ '
static,
117 RTF
: TReadTransportFactory
+ '
static,
118 IPF
: TInputProtocolFactory
+ '
static,
119 WTF
: TWriteTransportFactory
+ '
static,
120 OPF
: TOutputProtocolFactory
+ '
static,
122 r_trans_factory
: RTF
,
123 i_proto_factory
: IPF
,
124 w_trans_factory
: WTF
,
125 o_proto_factory
: OPF
,
127 worker_pool
: ThreadPool
,
130 impl<PRC
, RTF
, IPF
, WTF
, OPF
> TServer
<PRC
, RTF
, IPF
, WTF
, OPF
>
132 PRC
: TProcessor
+ Send
+ Sync
+ '
static,
133 RTF
: TReadTransportFactory
+ '
static,
134 IPF
: TInputProtocolFactory
+ '
static,
135 WTF
: TWriteTransportFactory
+ '
static,
136 OPF
: TOutputProtocolFactory
+ '
static,
138 /// Create a `TServer`.
140 /// Each accepted connection has an input and output half, each of which
141 /// requires a `TTransport` and `TProtocol`. `TServer` uses
142 /// `read_transport_factory` and `input_protocol_factory` to create
143 /// implementations for the input, and `write_transport_factory` and
144 /// `output_protocol_factory` to create implementations for the output.
146 read_transport_factory
: RTF
,
147 input_protocol_factory
: IPF
,
148 write_transport_factory
: WTF
,
149 output_protocol_factory
: OPF
,
152 ) -> TServer
<PRC
, RTF
, IPF
, WTF
, OPF
> {
154 r_trans_factory
: read_transport_factory
,
155 i_proto_factory
: input_protocol_factory
,
156 w_trans_factory
: write_transport_factory
,
157 o_proto_factory
: output_protocol_factory
,
158 processor
: Arc
::new(processor
),
159 worker_pool
: ThreadPool
::with_name("Thrift service processor".to_owned(), num_workers
),
163 /// Listen for incoming connections on `listen_address`.
165 /// `listen_address` should be in the form `host:port`,
166 /// for example: `127.0.0.1:8080`.
168 /// Return `()` if successful.
170 /// Return `Err` when the server cannot bind to `listen_address` or there
171 /// is an unrecoverable error.
172 pub fn listen(&mut self, listen_address
: &str) -> ::Result
<()> {
173 let listener
= TcpListener
::bind(listen_address
)?
;
174 for stream
in listener
.incoming() {
177 let (i_prot
, o_prot
) = self.new_protocols_for_connection(s
)?
;
178 let processor
= self.processor
.clone();
180 .execute(move || handle_incoming_connection(processor
, i_prot
, o_prot
));
183 warn
!("failed to accept remote connection with error {:?}", e
);
188 Err(::Error
::Application(ApplicationError
{
189 kind
: ApplicationErrorKind
::Unknown
,
190 message
: "aborted listen loop".into(),
194 fn new_protocols_for_connection(
197 ) -> ::Result
<(Box
<dyn TInputProtocol
+ Send
>, Box
<dyn TOutputProtocol
+ Send
>)> {
198 // create the shared tcp stream
199 let channel
= TTcpChannel
::with_stream(stream
);
201 // split it into two - one to be owned by the
202 // input tran/proto and the other by the output
203 let (r_chan
, w_chan
) = channel
.split()?
;
205 // input protocol and transport
206 let r_tran
= self.r_trans_factory
.create(Box
::new(r_chan
));
207 let i_prot
= self.i_proto_factory
.create(r_tran
);
209 // output protocol and transport
210 let w_tran
= self.w_trans_factory
.create(Box
::new(w_chan
));
211 let o_prot
= self.o_proto_factory
.create(w_tran
);
217 fn handle_incoming_connection
<PRC
>(
219 i_prot
: Box
<dyn TInputProtocol
>,
220 o_prot
: Box
<dyn TOutputProtocol
>,
224 let mut i_prot
= i_prot
;
225 let mut o_prot
= o_prot
;
227 let r
= processor
.process(&mut *i_prot
, &mut *o_prot
);
229 warn
!("processor completed with error: {:?}", e
);