]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/delphi/src/Thrift.Processor.Multiplex.pas
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / delphi / src / Thrift.Processor.Multiplex.pas
CommitLineData
f67539c2
TL
1(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20unit Thrift.Processor.Multiplex;
21
22
23interface
24
25uses
26 SysUtils,
27 Generics.Collections,
28 Thrift,
29 Thrift.Protocol,
30 Thrift.Protocol.Multiplex;
31
32{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
33 To do so, you instantiate the processor and then register additional processors with it,
34 as shown in the following example:
35
36
37 TMultiplexedProcessor processor = new TMultiplexedProcessor();
38
39 processor.registerProcessor(
40 "Calculator",
41 new Calculator.Processor(new CalculatorHandler()));
42
43 processor.registerProcessor(
44 "WeatherReport",
45 new WeatherReport.Processor(new WeatherReportHandler()));
46
47 TServerTransport t = new TServerSocket(9090);
48 TSimpleServer server = new TSimpleServer(processor, t);
49
50 server.serve();
51}
52
53
54type
55 IMultiplexedProcessor = interface( IProcessor)
56 ['{807F9D19-6CF4-4789-840E-93E87A12EB63}']
57 // Register a service with this TMultiplexedProcessor. This allows us
58 // to broker requests to individual services by using the service name
59 // to select them at request time.
60 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE);
61 end;
62
63
64 TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
65 private type
66 // Our goal was to work with any protocol. In order to do that, we needed
67 // to allow them to call readMessageBegin() and get a TMessage in exactly
68 // the standard format, without the service name prepended to TMessage.name.
69 TStoredMessageProtocol = class( TProtocolDecorator)
70 private
71 FMessageBegin : TThriftMessage;
72 public
73 constructor Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage);
74 function ReadMessageBegin: TThriftMessage; override;
75 end;
76
77 private
78 FServiceProcessorMap : TDictionary<String, IProcessor>;
79 FDefaultProcessor : IProcessor;
80
81 procedure Error( const oprot : IProtocol; const msg : TThriftMessage;
82 extype : TApplicationExceptionSpecializedClass; const etxt : string);
83
84 public
85 constructor Create;
86 destructor Destroy; override;
87
88 // Register a service with this TMultiplexedProcessorImpl. This allows us
89 // to broker requests to individual services by using the service name
90 // to select them at request time.
91 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE);
92
93 { This implementation of process performs the following steps:
94 - Read the beginning of the message.
95 - Extract the service name from the message.
96 - Using the service name to locate the appropriate processor.
97 - Dispatch to the processor, with a decorated instance of TProtocol
98 that allows readMessageBegin() to return the original TMessage.
99
100 An exception is thrown if the message type is not CALL or ONEWAY
101 or if the service is unknown (or not properly registered).
102 }
103 function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean;
104 end;
105
106
107implementation
108
109constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage);
110begin
111 inherited Create( protocol);
112 FMessageBegin := aMsgBegin;
113end;
114
115
116function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage;
117begin
118 result := FMessageBegin;
119end;
120
121
122constructor TMultiplexedProcessorImpl.Create;
123begin
124 inherited Create;
125 FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
126end;
127
128
129destructor TMultiplexedProcessorImpl.Destroy;
130begin
131 try
132 FreeAndNil( FServiceProcessorMap);
133 finally
134 inherited Destroy;
135 end;
136end;
137
138
139procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean);
140begin
141 FServiceProcessorMap.Add( serviceName, processor);
142
143 if asDefault then begin
144 if FDefaultProcessor = nil
145 then FDefaultProcessor := processor
146 else raise TApplicationExceptionInternalError.Create('Only one default service allowed');
147 end;
148end;
149
150
151procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : TThriftMessage;
152 extype : TApplicationExceptionSpecializedClass;
153 const etxt : string);
154var appex : TApplicationException;
155 newMsg : TThriftMessage;
156begin
157 appex := extype.Create(etxt);
158 try
159 Init( newMsg, msg.Name, TMessageType.Exception, msg.SeqID);
160
161 oprot.WriteMessageBegin(newMsg);
162 appex.Write(oprot);
163 oprot.WriteMessageEnd();
164 oprot.Transport.Flush();
165
166 finally
167 appex.Free;
168 end;
169end;
170
171
172function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean;
173var msg, newMsg : TThriftMessage;
174 idx : Integer;
175 sService : string;
176 processor : IProcessor;
177 protocol : IProtocol;
178const
179 ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"';
180 ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
181 ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor';
182begin
183 // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
184 // This pulls the message "off the wire", which we'll deal with at the end of this method.
185 msg := iprot.readMessageBegin();
186 if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin
187 Error( oprot, msg,
188 TApplicationExceptionInvalidMessageType,
189 ERROR_INVALID_MSGTYPE);
190 Exit( FALSE);
191 end;
192
193 // Extract the service name
194 // use FDefaultProcessor as fallback if there is no separator
195 idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
196 if idx > 0 then begin
197
198 // Create a new TMessage, something that can be consumed by any TProtocol
199 sService := Copy( msg.Name, 1, idx-1);
200 if not FServiceProcessorMap.TryGetValue( sService, processor)
201 then begin
202 Error( oprot, msg,
203 TApplicationExceptionInternalError,
204 Format(ERROR_UNKNOWN_SERVICE,[sService]));
205 Exit( FALSE);
206 end;
207
208 // Create a new TMessage, removing the service name
209 Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
210 Init( newMsg, Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
211
212 end
213 else if FDefaultProcessor <> nil then begin
214 processor := FDefaultProcessor;
215 newMsg := msg; // no need to change
216
217 end
218 else begin
219 Error( oprot, msg,
220 TApplicationExceptionInvalidProtocol,
221 Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
222 Exit( FALSE);
223 end;
224
225 // Dispatch processing to the stored processor
226 protocol := TStoredMessageProtocol.Create( iprot, newMsg);
227 result := processor.process( protocol, oprot, events);
228end;
229
230
231end.