]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | use std::collections::HashMap; | |
19 | use std::convert::Into; | |
20 | use std::fmt; | |
21 | use std::fmt::{Debug, Formatter}; | |
22 | use std::sync::{Arc, Mutex}; | |
23 | ||
24 | use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; | |
25 | ||
26 | use super::{handle_process_result, TProcessor}; | |
27 | ||
28 | const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str = | |
29 | "missing service separator and no default processor set"; | |
30 | type ThreadSafeProcessor = Box<dyn TProcessor + Send + Sync>; | |
31 | ||
32 | /// A `TProcessor` that can demux service calls to multiple underlying | |
33 | /// Thrift services. | |
34 | /// | |
35 | /// Users register service-specific `TProcessor` instances with a | |
36 | /// `TMultiplexedProcessor`, and then register that processor with a server | |
37 | /// implementation. Following that, all incoming service calls are automatically | |
38 | /// routed to the service-specific `TProcessor`. | |
39 | /// | |
40 | /// A `TMultiplexedProcessor` can only handle messages sent by a | |
41 | /// `TMultiplexedOutputProtocol`. | |
42 | #[derive(Default)] | |
43 | pub struct TMultiplexedProcessor { | |
44 | stored: Mutex<StoredProcessors>, | |
45 | } | |
46 | ||
47 | #[derive(Default)] | |
48 | struct StoredProcessors { | |
49 | processors: HashMap<String, Arc<ThreadSafeProcessor>>, | |
50 | default_processor: Option<Arc<ThreadSafeProcessor>>, | |
51 | } | |
52 | ||
53 | impl TMultiplexedProcessor { | |
54 | /// Create a new `TMultiplexedProcessor` with no registered service-specific | |
55 | /// processors. | |
56 | pub fn new() -> TMultiplexedProcessor { | |
57 | TMultiplexedProcessor { | |
58 | stored: Mutex::new(StoredProcessors { | |
59 | processors: HashMap::new(), | |
60 | default_processor: None, | |
61 | }), | |
62 | } | |
63 | } | |
64 | ||
65 | /// Register a service-specific `processor` for the service named | |
66 | /// `service_name`. This implementation is also backwards-compatible with | |
67 | /// non-multiplexed clients. Set `as_default` to `true` to allow | |
68 | /// non-namespaced requests to be dispatched to a default processor. | |
69 | /// | |
70 | /// Returns success if a new entry was inserted. Returns an error if: | |
71 | /// * A processor exists for `service_name` | |
72 | /// * You attempt to register a processor as default, and an existing default exists | |
73 | #[cfg_attr(feature = "cargo-clippy", allow(map_entry))] | |
74 | pub fn register<S: Into<String>>( | |
75 | &mut self, | |
76 | service_name: S, | |
77 | processor: Box<dyn TProcessor + Send + Sync>, | |
78 | as_default: bool, | |
79 | ) -> ::Result<()> { | |
80 | let mut stored = self.stored.lock().unwrap(); | |
81 | ||
82 | let name = service_name.into(); | |
83 | if !stored.processors.contains_key(&name) { | |
84 | let processor = Arc::new(processor); | |
85 | ||
86 | if as_default { | |
87 | if stored.default_processor.is_none() { | |
88 | stored.processors.insert(name, processor.clone()); | |
89 | stored.default_processor = Some(processor.clone()); | |
90 | Ok(()) | |
91 | } else { | |
92 | Err("cannot reset default processor".into()) | |
93 | } | |
94 | } else { | |
95 | stored.processors.insert(name, processor); | |
96 | Ok(()) | |
97 | } | |
98 | } else { | |
99 | Err(format!("cannot overwrite existing processor for service {}", name).into()) | |
100 | } | |
101 | } | |
102 | ||
103 | fn process_message( | |
104 | &self, | |
105 | msg_ident: &TMessageIdentifier, | |
106 | i_prot: &mut dyn TInputProtocol, | |
107 | o_prot: &mut dyn TOutputProtocol, | |
108 | ) -> ::Result<()> { | |
109 | let (svc_name, svc_call) = split_ident_name(&msg_ident.name); | |
110 | debug!("routing svc_name {:?} svc_call {}", &svc_name, &svc_call); | |
111 | ||
112 | let processor: Option<Arc<ThreadSafeProcessor>> = { | |
113 | let stored = self.stored.lock().unwrap(); | |
114 | if let Some(name) = svc_name { | |
115 | stored.processors.get(name).cloned() | |
116 | } else { | |
117 | stored.default_processor.clone() | |
118 | } | |
119 | }; | |
120 | ||
121 | match processor { | |
122 | Some(arc) => { | |
123 | let new_msg_ident = TMessageIdentifier::new( | |
124 | svc_call, | |
125 | msg_ident.message_type, | |
126 | msg_ident.sequence_number, | |
127 | ); | |
128 | let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident); | |
129 | (*arc).process(&mut proxy_i_prot, o_prot) | |
130 | } | |
131 | None => Err(missing_processor_message(svc_name).into()), | |
132 | } | |
133 | } | |
134 | } | |
135 | ||
136 | impl TProcessor for TMultiplexedProcessor { | |
137 | fn process(&self, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> ::Result<()> { | |
138 | let msg_ident = i_prot.read_message_begin()?; | |
139 | ||
140 | debug!("process incoming msg id:{:?}", &msg_ident); | |
141 | let res = self.process_message(&msg_ident, i_prot, o_prot); | |
142 | ||
143 | handle_process_result(&msg_ident, res, o_prot) | |
144 | } | |
145 | } | |
146 | ||
147 | impl Debug for TMultiplexedProcessor { | |
148 | fn fmt(&self, f: &mut Formatter) -> fmt::Result { | |
149 | let stored = self.stored.lock().unwrap(); | |
150 | write!( | |
151 | f, | |
152 | "TMultiplexedProcess {{ registered_count: {:?} default: {:?} }}", | |
153 | stored.processors.keys().len(), | |
154 | stored.default_processor.is_some() | |
155 | ) | |
156 | } | |
157 | } | |
158 | ||
159 | fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) { | |
160 | ident_name | |
161 | .find(':') | |
162 | .map(|pos| { | |
163 | let (svc_name, svc_call) = ident_name.split_at(pos); | |
164 | let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name | |
165 | (Some(svc_name), svc_call) | |
166 | }) | |
167 | .or_else(|| Some((None, ident_name))) | |
168 | .unwrap() | |
169 | } | |
170 | ||
171 | fn missing_processor_message(svc_name: Option<&str>) -> String { | |
172 | match svc_name { | |
173 | Some(name) => format!("no processor found for service {}", name), | |
174 | None => MISSING_SEPARATOR_AND_NO_DEFAULT.to_owned(), | |
175 | } | |
176 | } | |
177 | ||
178 | #[cfg(test)] | |
179 | mod tests { | |
180 | use std::convert::Into; | |
181 | use std::sync::atomic::{AtomicBool, Ordering}; | |
182 | use std::sync::Arc; | |
183 | ||
184 | use protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType}; | |
185 | use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf}; | |
186 | use {ApplicationError, ApplicationErrorKind}; | |
187 | ||
188 | use super::*; | |
189 | ||
190 | #[test] | |
191 | fn should_split_name_into_proper_separator_and_service_call() { | |
192 | let ident_name = "foo:bar_call"; | |
193 | let (serv, call) = split_ident_name(&ident_name); | |
194 | assert_eq!(serv, Some("foo")); | |
195 | assert_eq!(call, "bar_call"); | |
196 | } | |
197 | ||
198 | #[test] | |
199 | fn should_return_full_ident_if_no_separator_exists() { | |
200 | let ident_name = "bar_call"; | |
201 | let (serv, call) = split_ident_name(&ident_name); | |
202 | assert_eq!(serv, None); | |
203 | assert_eq!(call, "bar_call"); | |
204 | } | |
205 | ||
206 | #[test] | |
207 | fn should_write_error_if_no_separator_found_and_no_default_processor_exists() { | |
208 | let (mut i, mut o) = build_objects(); | |
209 | ||
210 | let sent_ident = TMessageIdentifier::new("foo", TMessageType::Call, 10); | |
211 | o.write_message_begin(&sent_ident).unwrap(); | |
212 | o.flush().unwrap(); | |
213 | o.transport.copy_write_buffer_to_read_buffer(); | |
214 | o.transport.empty_write_buffer(); | |
215 | ||
216 | let p = TMultiplexedProcessor::new(); | |
217 | p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out | |
218 | ||
219 | i.transport.set_readable_bytes(&o.transport.write_bytes()); | |
220 | let rcvd_ident = i.read_message_begin().unwrap(); | |
221 | let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10); | |
222 | assert_eq!(rcvd_ident, expected_ident); | |
223 | let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap(); | |
224 | let expected_err = ApplicationError::new( | |
225 | ApplicationErrorKind::Unknown, | |
226 | MISSING_SEPARATOR_AND_NO_DEFAULT, | |
227 | ); | |
228 | assert_eq!(rcvd_err, expected_err); | |
229 | } | |
230 | ||
231 | #[test] | |
232 | fn should_write_error_if_separator_exists_and_no_processor_found() { | |
233 | let (mut i, mut o) = build_objects(); | |
234 | ||
235 | let sent_ident = TMessageIdentifier::new("missing:call", TMessageType::Call, 10); | |
236 | o.write_message_begin(&sent_ident).unwrap(); | |
237 | o.flush().unwrap(); | |
238 | o.transport.copy_write_buffer_to_read_buffer(); | |
239 | o.transport.empty_write_buffer(); | |
240 | ||
241 | let p = TMultiplexedProcessor::new(); | |
242 | p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out | |
243 | ||
244 | i.transport.set_readable_bytes(&o.transport.write_bytes()); | |
245 | let rcvd_ident = i.read_message_begin().unwrap(); | |
246 | let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10); | |
247 | assert_eq!(rcvd_ident, expected_ident); | |
248 | let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap(); | |
249 | let expected_err = ApplicationError::new( | |
250 | ApplicationErrorKind::Unknown, | |
251 | missing_processor_message(Some("missing")), | |
252 | ); | |
253 | assert_eq!(rcvd_err, expected_err); | |
254 | } | |
255 | ||
256 | #[derive(Default)] | |
257 | struct Service { | |
258 | pub invoked: Arc<AtomicBool>, | |
259 | } | |
260 | ||
261 | impl TProcessor for Service { | |
262 | fn process(&self, _: &mut dyn TInputProtocol, _: &mut dyn TOutputProtocol) -> ::Result<()> { | |
263 | let res = self | |
264 | .invoked | |
265 | .compare_and_swap(false, true, Ordering::Relaxed); | |
266 | if res { | |
267 | Ok(()) | |
268 | } else { | |
269 | Err("failed swap".into()) | |
270 | } | |
271 | } | |
272 | } | |
273 | ||
274 | #[test] | |
275 | fn should_route_call_to_correct_processor() { | |
276 | let (mut i, mut o) = build_objects(); | |
277 | ||
278 | // build the services | |
279 | let svc_1 = Service { | |
280 | invoked: Arc::new(AtomicBool::new(false)), | |
281 | }; | |
282 | let atm_1 = svc_1.invoked.clone(); | |
283 | let svc_2 = Service { | |
284 | invoked: Arc::new(AtomicBool::new(false)), | |
285 | }; | |
286 | let atm_2 = svc_2.invoked.clone(); | |
287 | ||
288 | // register them | |
289 | let mut p = TMultiplexedProcessor::new(); | |
290 | p.register("service_1", Box::new(svc_1), false).unwrap(); | |
291 | p.register("service_2", Box::new(svc_2), false).unwrap(); | |
292 | ||
293 | // make the service call | |
294 | let sent_ident = TMessageIdentifier::new("service_1:call", TMessageType::Call, 10); | |
295 | o.write_message_begin(&sent_ident).unwrap(); | |
296 | o.flush().unwrap(); | |
297 | o.transport.copy_write_buffer_to_read_buffer(); | |
298 | o.transport.empty_write_buffer(); | |
299 | ||
300 | p.process(&mut i, &mut o).unwrap(); | |
301 | ||
302 | // service 1 should have been invoked, not service 2 | |
303 | assert_eq!(atm_1.load(Ordering::Relaxed), true); | |
304 | assert_eq!(atm_2.load(Ordering::Relaxed), false); | |
305 | } | |
306 | ||
307 | #[test] | |
308 | fn should_route_call_to_correct_processor_if_no_separator_exists_and_default_processor_set() { | |
309 | let (mut i, mut o) = build_objects(); | |
310 | ||
311 | // build the services | |
312 | let svc_1 = Service { | |
313 | invoked: Arc::new(AtomicBool::new(false)), | |
314 | }; | |
315 | let atm_1 = svc_1.invoked.clone(); | |
316 | let svc_2 = Service { | |
317 | invoked: Arc::new(AtomicBool::new(false)), | |
318 | }; | |
319 | let atm_2 = svc_2.invoked.clone(); | |
320 | ||
321 | // register them | |
322 | let mut p = TMultiplexedProcessor::new(); | |
323 | p.register("service_1", Box::new(svc_1), false).unwrap(); | |
324 | p.register("service_2", Box::new(svc_2), true).unwrap(); // second processor is default | |
325 | ||
326 | // make the service call (it's an old client, so we have to be backwards compatible) | |
327 | let sent_ident = TMessageIdentifier::new("old_call", TMessageType::Call, 10); | |
328 | o.write_message_begin(&sent_ident).unwrap(); | |
329 | o.flush().unwrap(); | |
330 | o.transport.copy_write_buffer_to_read_buffer(); | |
331 | o.transport.empty_write_buffer(); | |
332 | ||
333 | p.process(&mut i, &mut o).unwrap(); | |
334 | ||
335 | // service 2 should have been invoked, not service 1 | |
336 | assert_eq!(atm_1.load(Ordering::Relaxed), false); | |
337 | assert_eq!(atm_2.load(Ordering::Relaxed), true); | |
338 | } | |
339 | ||
340 | fn build_objects() -> ( | |
341 | TBinaryInputProtocol<ReadHalf<TBufferChannel>>, | |
342 | TBinaryOutputProtocol<WriteHalf<TBufferChannel>>, | |
343 | ) { | |
344 | let c = TBufferChannel::with_capacity(128, 128); | |
345 | let (r_c, w_c) = c.split().unwrap(); | |
346 | ( | |
347 | TBinaryInputProtocol::new(r_c, true), | |
348 | TBinaryOutputProtocol::new(w_c, true), | |
349 | ) | |
350 | } | |
351 | } |