]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | (* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | *) | |
19 | ||
20 | unit Thrift.Processor.Multiplex; | |
21 | ||
22 | ||
23 | interface | |
24 | ||
25 | uses | |
26 | SysUtils, | |
27 | Generics.Collections, | |
28 | Thrift, | |
29 | Thrift.Protocol, | |
30 | Thrift.Protocol.Multiplex; | |
31 | ||
32 | { TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services. | |
33 | To do so, you instantiate the processor and then register additional processors with it, | |
34 | as shown in the following example: | |
35 | ||
36 | ||
37 | TMultiplexedProcessor processor = new TMultiplexedProcessor(); | |
38 | ||
39 | processor.registerProcessor( | |
40 | "Calculator", | |
41 | new Calculator.Processor(new CalculatorHandler())); | |
42 | ||
43 | processor.registerProcessor( | |
44 | "WeatherReport", | |
45 | new WeatherReport.Processor(new WeatherReportHandler())); | |
46 | ||
47 | TServerTransport t = new TServerSocket(9090); | |
48 | TSimpleServer server = new TSimpleServer(processor, t); | |
49 | ||
50 | server.serve(); | |
51 | } | |
52 | ||
53 | ||
54 | type | |
55 | IMultiplexedProcessor = interface( IProcessor) | |
56 | ['{807F9D19-6CF4-4789-840E-93E87A12EB63}'] | |
57 | // Register a service with this TMultiplexedProcessor. This allows us | |
58 | // to broker requests to individual services by using the service name | |
59 | // to select them at request time. | |
60 | procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE); | |
61 | end; | |
62 | ||
63 | ||
64 | TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor) | |
65 | private type | |
66 | // Our goal was to work with any protocol. In order to do that, we needed | |
67 | // to allow them to call readMessageBegin() and get a TMessage in exactly | |
68 | // the standard format, without the service name prepended to TMessage.name. | |
69 | TStoredMessageProtocol = class( TProtocolDecorator) | |
70 | private | |
71 | FMessageBegin : TThriftMessage; | |
72 | public | |
73 | constructor Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage); | |
74 | function ReadMessageBegin: TThriftMessage; override; | |
75 | end; | |
76 | ||
77 | private | |
78 | FServiceProcessorMap : TDictionary<String, IProcessor>; | |
79 | FDefaultProcessor : IProcessor; | |
80 | ||
81 | procedure Error( const oprot : IProtocol; const msg : TThriftMessage; | |
82 | extype : TApplicationExceptionSpecializedClass; const etxt : string); | |
83 | ||
84 | public | |
85 | constructor Create; | |
86 | destructor Destroy; override; | |
87 | ||
88 | // Register a service with this TMultiplexedProcessorImpl. This allows us | |
89 | // to broker requests to individual services by using the service name | |
90 | // to select them at request time. | |
91 | procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE); | |
92 | ||
93 | { This implementation of process performs the following steps: | |
94 | - Read the beginning of the message. | |
95 | - Extract the service name from the message. | |
96 | - Using the service name to locate the appropriate processor. | |
97 | - Dispatch to the processor, with a decorated instance of TProtocol | |
98 | that allows readMessageBegin() to return the original TMessage. | |
99 | ||
100 | An exception is thrown if the message type is not CALL or ONEWAY | |
101 | or if the service is unknown (or not properly registered). | |
102 | } | |
103 | function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean; | |
104 | end; | |
105 | ||
106 | ||
107 | implementation | |
108 | ||
109 | constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage); | |
110 | begin | |
111 | inherited Create( protocol); | |
112 | FMessageBegin := aMsgBegin; | |
113 | end; | |
114 | ||
115 | ||
116 | function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage; | |
117 | begin | |
118 | result := FMessageBegin; | |
119 | end; | |
120 | ||
121 | ||
122 | constructor TMultiplexedProcessorImpl.Create; | |
123 | begin | |
124 | inherited Create; | |
125 | FServiceProcessorMap := TDictionary<string,IProcessor>.Create; | |
126 | end; | |
127 | ||
128 | ||
129 | destructor TMultiplexedProcessorImpl.Destroy; | |
130 | begin | |
131 | try | |
132 | FreeAndNil( FServiceProcessorMap); | |
133 | finally | |
134 | inherited Destroy; | |
135 | end; | |
136 | end; | |
137 | ||
138 | ||
139 | procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean); | |
140 | begin | |
141 | FServiceProcessorMap.Add( serviceName, processor); | |
142 | ||
143 | if asDefault then begin | |
144 | if FDefaultProcessor = nil | |
145 | then FDefaultProcessor := processor | |
146 | else raise TApplicationExceptionInternalError.Create('Only one default service allowed'); | |
147 | end; | |
148 | end; | |
149 | ||
150 | ||
151 | procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : TThriftMessage; | |
152 | extype : TApplicationExceptionSpecializedClass; | |
153 | const etxt : string); | |
154 | var appex : TApplicationException; | |
155 | newMsg : TThriftMessage; | |
156 | begin | |
157 | appex := extype.Create(etxt); | |
158 | try | |
159 | Init( newMsg, msg.Name, TMessageType.Exception, msg.SeqID); | |
160 | ||
161 | oprot.WriteMessageBegin(newMsg); | |
162 | appex.Write(oprot); | |
163 | oprot.WriteMessageEnd(); | |
164 | oprot.Transport.Flush(); | |
165 | ||
166 | finally | |
167 | appex.Free; | |
168 | end; | |
169 | end; | |
170 | ||
171 | ||
172 | function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean; | |
173 | var msg, newMsg : TThriftMessage; | |
174 | idx : Integer; | |
175 | sService : string; | |
176 | processor : IProcessor; | |
177 | protocol : IProtocol; | |
178 | const | |
179 | ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"'; | |
180 | ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.'; | |
181 | ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor'; | |
182 | begin | |
183 | // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header. | |
184 | // This pulls the message "off the wire", which we'll deal with at the end of this method. | |
185 | msg := iprot.readMessageBegin(); | |
186 | if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin | |
187 | Error( oprot, msg, | |
188 | TApplicationExceptionInvalidMessageType, | |
189 | ERROR_INVALID_MSGTYPE); | |
190 | Exit( FALSE); | |
191 | end; | |
192 | ||
193 | // Extract the service name | |
194 | // use FDefaultProcessor as fallback if there is no separator | |
195 | idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name); | |
196 | if idx > 0 then begin | |
197 | ||
198 | // Create a new TMessage, something that can be consumed by any TProtocol | |
199 | sService := Copy( msg.Name, 1, idx-1); | |
200 | if not FServiceProcessorMap.TryGetValue( sService, processor) | |
201 | then begin | |
202 | Error( oprot, msg, | |
203 | TApplicationExceptionInternalError, | |
204 | Format(ERROR_UNKNOWN_SERVICE,[sService])); | |
205 | Exit( FALSE); | |
206 | end; | |
207 | ||
208 | // Create a new TMessage, removing the service name | |
209 | Inc( idx, Length(TMultiplexedProtocol.SEPARATOR)); | |
210 | Init( newMsg, Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID); | |
211 | ||
212 | end | |
213 | else if FDefaultProcessor <> nil then begin | |
214 | processor := FDefaultProcessor; | |
215 | newMsg := msg; // no need to change | |
216 | ||
217 | end | |
218 | else begin | |
219 | Error( oprot, msg, | |
220 | TApplicationExceptionInvalidProtocol, | |
221 | Format(ERROR_INCOMPATIBLE_PROT,[msg.Name])); | |
222 | Exit( FALSE); | |
223 | end; | |
224 | ||
225 | // Dispatch processing to the stored processor | |
226 | protocol := TStoredMessageProtocol.Create( iprot, newMsg); | |
227 | result := processor.process( protocol, oprot, events); | |
228 | end; | |
229 | ||
230 | ||
231 | end. |