]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | package thrift | |
21 | ||
22 | import ( | |
23 | "log" | |
24 | "runtime/debug" | |
25 | "sync" | |
26 | "sync/atomic" | |
27 | ) | |
28 | ||
29 | /* | |
30 | * This is not a typical TSimpleServer as it is not blocked after accept a socket. | |
31 | * It is more like a TThreadedServer that can handle different connections in different goroutines. | |
32 | * This will work if golang user implements a conn-pool like thing in client side. | |
33 | */ | |
34 | type TSimpleServer struct { | |
35 | closed int32 | |
36 | wg sync.WaitGroup | |
37 | mu sync.Mutex | |
38 | ||
39 | processorFactory TProcessorFactory | |
40 | serverTransport TServerTransport | |
41 | inputTransportFactory TTransportFactory | |
42 | outputTransportFactory TTransportFactory | |
43 | inputProtocolFactory TProtocolFactory | |
44 | outputProtocolFactory TProtocolFactory | |
45 | ||
46 | // Headers to auto forward in THeaderProtocol | |
47 | forwardHeaders []string | |
48 | } | |
49 | ||
50 | func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer { | |
51 | return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport) | |
52 | } | |
53 | ||
54 | func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer { | |
55 | return NewTSimpleServerFactory4(NewTProcessorFactory(processor), | |
56 | serverTransport, | |
57 | transportFactory, | |
58 | protocolFactory, | |
59 | ) | |
60 | } | |
61 | ||
62 | func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer { | |
63 | return NewTSimpleServerFactory6(NewTProcessorFactory(processor), | |
64 | serverTransport, | |
65 | inputTransportFactory, | |
66 | outputTransportFactory, | |
67 | inputProtocolFactory, | |
68 | outputProtocolFactory, | |
69 | ) | |
70 | } | |
71 | ||
72 | func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer { | |
73 | return NewTSimpleServerFactory6(processorFactory, | |
74 | serverTransport, | |
75 | NewTTransportFactory(), | |
76 | NewTTransportFactory(), | |
77 | NewTBinaryProtocolFactoryDefault(), | |
78 | NewTBinaryProtocolFactoryDefault(), | |
79 | ) | |
80 | } | |
81 | ||
82 | func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer { | |
83 | return NewTSimpleServerFactory6(processorFactory, | |
84 | serverTransport, | |
85 | transportFactory, | |
86 | transportFactory, | |
87 | protocolFactory, | |
88 | protocolFactory, | |
89 | ) | |
90 | } | |
91 | ||
92 | func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer { | |
93 | return &TSimpleServer{ | |
94 | processorFactory: processorFactory, | |
95 | serverTransport: serverTransport, | |
96 | inputTransportFactory: inputTransportFactory, | |
97 | outputTransportFactory: outputTransportFactory, | |
98 | inputProtocolFactory: inputProtocolFactory, | |
99 | outputProtocolFactory: outputProtocolFactory, | |
100 | } | |
101 | } | |
102 | ||
103 | func (p *TSimpleServer) ProcessorFactory() TProcessorFactory { | |
104 | return p.processorFactory | |
105 | } | |
106 | ||
107 | func (p *TSimpleServer) ServerTransport() TServerTransport { | |
108 | return p.serverTransport | |
109 | } | |
110 | ||
111 | func (p *TSimpleServer) InputTransportFactory() TTransportFactory { | |
112 | return p.inputTransportFactory | |
113 | } | |
114 | ||
115 | func (p *TSimpleServer) OutputTransportFactory() TTransportFactory { | |
116 | return p.outputTransportFactory | |
117 | } | |
118 | ||
119 | func (p *TSimpleServer) InputProtocolFactory() TProtocolFactory { | |
120 | return p.inputProtocolFactory | |
121 | } | |
122 | ||
123 | func (p *TSimpleServer) OutputProtocolFactory() TProtocolFactory { | |
124 | return p.outputProtocolFactory | |
125 | } | |
126 | ||
127 | func (p *TSimpleServer) Listen() error { | |
128 | return p.serverTransport.Listen() | |
129 | } | |
130 | ||
131 | // SetForwardHeaders sets the list of header keys that will be auto forwarded | |
132 | // while using THeaderProtocol. | |
133 | // | |
134 | // "forward" means that when the server is also a client to other upstream | |
135 | // thrift servers, the context object user gets in the processor functions will | |
136 | // have both read and write headers set, with write headers being forwarded. | |
137 | // Users can always override the write headers by calling SetWriteHeaderList | |
138 | // before calling thrift client functions. | |
139 | func (p *TSimpleServer) SetForwardHeaders(headers []string) { | |
140 | size := len(headers) | |
141 | if size == 0 { | |
142 | p.forwardHeaders = nil | |
143 | return | |
144 | } | |
145 | ||
146 | keys := make([]string, size) | |
147 | copy(keys, headers) | |
148 | p.forwardHeaders = keys | |
149 | } | |
150 | ||
151 | func (p *TSimpleServer) innerAccept() (int32, error) { | |
152 | client, err := p.serverTransport.Accept() | |
153 | p.mu.Lock() | |
154 | defer p.mu.Unlock() | |
155 | closed := atomic.LoadInt32(&p.closed) | |
156 | if closed != 0 { | |
157 | return closed, nil | |
158 | } | |
159 | if err != nil { | |
160 | return 0, err | |
161 | } | |
162 | if client != nil { | |
163 | p.wg.Add(1) | |
164 | go func() { | |
165 | defer p.wg.Done() | |
166 | if err := p.processRequests(client); err != nil { | |
167 | log.Println("error processing request:", err) | |
168 | } | |
169 | }() | |
170 | } | |
171 | return 0, nil | |
172 | } | |
173 | ||
174 | func (p *TSimpleServer) AcceptLoop() error { | |
175 | for { | |
176 | closed, err := p.innerAccept() | |
177 | if err != nil { | |
178 | return err | |
179 | } | |
180 | if closed != 0 { | |
181 | return nil | |
182 | } | |
183 | } | |
184 | } | |
185 | ||
186 | func (p *TSimpleServer) Serve() error { | |
187 | err := p.Listen() | |
188 | if err != nil { | |
189 | return err | |
190 | } | |
191 | p.AcceptLoop() | |
192 | return nil | |
193 | } | |
194 | ||
195 | func (p *TSimpleServer) Stop() error { | |
196 | p.mu.Lock() | |
197 | defer p.mu.Unlock() | |
198 | if atomic.LoadInt32(&p.closed) != 0 { | |
199 | return nil | |
200 | } | |
201 | atomic.StoreInt32(&p.closed, 1) | |
202 | p.serverTransport.Interrupt() | |
203 | p.wg.Wait() | |
204 | return nil | |
205 | } | |
206 | ||
207 | func (p *TSimpleServer) processRequests(client TTransport) error { | |
208 | processor := p.processorFactory.GetProcessor(client) | |
209 | inputTransport, err := p.inputTransportFactory.GetTransport(client) | |
210 | if err != nil { | |
211 | return err | |
212 | } | |
213 | inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport) | |
214 | var outputTransport TTransport | |
215 | var outputProtocol TProtocol | |
216 | ||
217 | // for THeaderProtocol, we must use the same protocol instance for | |
218 | // input and output so that the response is in the same dialect that | |
219 | // the server detected the request was in. | |
220 | headerProtocol, ok := inputProtocol.(*THeaderProtocol) | |
221 | if ok { | |
222 | outputProtocol = inputProtocol | |
223 | } else { | |
224 | oTrans, err := p.outputTransportFactory.GetTransport(client) | |
225 | if err != nil { | |
226 | return err | |
227 | } | |
228 | outputTransport = oTrans | |
229 | outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport) | |
230 | } | |
231 | ||
232 | defer func() { | |
233 | if e := recover(); e != nil { | |
234 | log.Printf("panic in processor: %s: %s", e, debug.Stack()) | |
235 | } | |
236 | }() | |
237 | ||
238 | if inputTransport != nil { | |
239 | defer inputTransport.Close() | |
240 | } | |
241 | if outputTransport != nil { | |
242 | defer outputTransport.Close() | |
243 | } | |
244 | for { | |
245 | if atomic.LoadInt32(&p.closed) != 0 { | |
246 | return nil | |
247 | } | |
248 | ||
249 | ctx := defaultCtx | |
250 | if headerProtocol != nil { | |
251 | // We need to call ReadFrame here, otherwise we won't | |
252 | // get any headers on the AddReadTHeaderToContext call. | |
253 | // | |
254 | // ReadFrame is safe to be called multiple times so it | |
255 | // won't break when it's called again later when we | |
256 | // actually start to read the message. | |
257 | if err := headerProtocol.ReadFrame(); err != nil { | |
258 | return err | |
259 | } | |
260 | ctx = AddReadTHeaderToContext(defaultCtx, headerProtocol.GetReadHeaders()) | |
261 | ctx = SetWriteHeaderList(ctx, p.forwardHeaders) | |
262 | } | |
263 | ||
264 | ok, err := processor.Process(ctx, inputProtocol, outputProtocol) | |
265 | if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE { | |
266 | return nil | |
267 | } else if err != nil { | |
268 | return err | |
269 | } | |
270 | if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD { | |
271 | continue | |
272 | } | |
273 | if !ok { | |
274 | break | |
275 | } | |
276 | } | |
277 | return nil | |
278 | } |