]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/go/thrift/simple_server.go
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / go / thrift / simple_server.go
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package thrift
21
22 import (
23 "log"
24 "runtime/debug"
25 "sync"
26 "sync/atomic"
27 )
28
29 /*
30 * This is not a typical TSimpleServer as it is not blocked after accept a socket.
31 * It is more like a TThreadedServer that can handle different connections in different goroutines.
32 * This will work if golang user implements a conn-pool like thing in client side.
33 */
34 type TSimpleServer struct {
35 closed int32
36 wg sync.WaitGroup
37 mu sync.Mutex
38
39 processorFactory TProcessorFactory
40 serverTransport TServerTransport
41 inputTransportFactory TTransportFactory
42 outputTransportFactory TTransportFactory
43 inputProtocolFactory TProtocolFactory
44 outputProtocolFactory TProtocolFactory
45
46 // Headers to auto forward in THeaderProtocol
47 forwardHeaders []string
48 }
49
50 func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer {
51 return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport)
52 }
53
54 func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
55 return NewTSimpleServerFactory4(NewTProcessorFactory(processor),
56 serverTransport,
57 transportFactory,
58 protocolFactory,
59 )
60 }
61
62 func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
63 return NewTSimpleServerFactory6(NewTProcessorFactory(processor),
64 serverTransport,
65 inputTransportFactory,
66 outputTransportFactory,
67 inputProtocolFactory,
68 outputProtocolFactory,
69 )
70 }
71
72 func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer {
73 return NewTSimpleServerFactory6(processorFactory,
74 serverTransport,
75 NewTTransportFactory(),
76 NewTTransportFactory(),
77 NewTBinaryProtocolFactoryDefault(),
78 NewTBinaryProtocolFactoryDefault(),
79 )
80 }
81
82 func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
83 return NewTSimpleServerFactory6(processorFactory,
84 serverTransport,
85 transportFactory,
86 transportFactory,
87 protocolFactory,
88 protocolFactory,
89 )
90 }
91
92 func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
93 return &TSimpleServer{
94 processorFactory: processorFactory,
95 serverTransport: serverTransport,
96 inputTransportFactory: inputTransportFactory,
97 outputTransportFactory: outputTransportFactory,
98 inputProtocolFactory: inputProtocolFactory,
99 outputProtocolFactory: outputProtocolFactory,
100 }
101 }
102
103 func (p *TSimpleServer) ProcessorFactory() TProcessorFactory {
104 return p.processorFactory
105 }
106
107 func (p *TSimpleServer) ServerTransport() TServerTransport {
108 return p.serverTransport
109 }
110
111 func (p *TSimpleServer) InputTransportFactory() TTransportFactory {
112 return p.inputTransportFactory
113 }
114
115 func (p *TSimpleServer) OutputTransportFactory() TTransportFactory {
116 return p.outputTransportFactory
117 }
118
119 func (p *TSimpleServer) InputProtocolFactory() TProtocolFactory {
120 return p.inputProtocolFactory
121 }
122
123 func (p *TSimpleServer) OutputProtocolFactory() TProtocolFactory {
124 return p.outputProtocolFactory
125 }
126
127 func (p *TSimpleServer) Listen() error {
128 return p.serverTransport.Listen()
129 }
130
131 // SetForwardHeaders sets the list of header keys that will be auto forwarded
132 // while using THeaderProtocol.
133 //
134 // "forward" means that when the server is also a client to other upstream
135 // thrift servers, the context object user gets in the processor functions will
136 // have both read and write headers set, with write headers being forwarded.
137 // Users can always override the write headers by calling SetWriteHeaderList
138 // before calling thrift client functions.
139 func (p *TSimpleServer) SetForwardHeaders(headers []string) {
140 size := len(headers)
141 if size == 0 {
142 p.forwardHeaders = nil
143 return
144 }
145
146 keys := make([]string, size)
147 copy(keys, headers)
148 p.forwardHeaders = keys
149 }
150
151 func (p *TSimpleServer) innerAccept() (int32, error) {
152 client, err := p.serverTransport.Accept()
153 p.mu.Lock()
154 defer p.mu.Unlock()
155 closed := atomic.LoadInt32(&p.closed)
156 if closed != 0 {
157 return closed, nil
158 }
159 if err != nil {
160 return 0, err
161 }
162 if client != nil {
163 p.wg.Add(1)
164 go func() {
165 defer p.wg.Done()
166 if err := p.processRequests(client); err != nil {
167 log.Println("error processing request:", err)
168 }
169 }()
170 }
171 return 0, nil
172 }
173
174 func (p *TSimpleServer) AcceptLoop() error {
175 for {
176 closed, err := p.innerAccept()
177 if err != nil {
178 return err
179 }
180 if closed != 0 {
181 return nil
182 }
183 }
184 }
185
186 func (p *TSimpleServer) Serve() error {
187 err := p.Listen()
188 if err != nil {
189 return err
190 }
191 p.AcceptLoop()
192 return nil
193 }
194
195 func (p *TSimpleServer) Stop() error {
196 p.mu.Lock()
197 defer p.mu.Unlock()
198 if atomic.LoadInt32(&p.closed) != 0 {
199 return nil
200 }
201 atomic.StoreInt32(&p.closed, 1)
202 p.serverTransport.Interrupt()
203 p.wg.Wait()
204 return nil
205 }
206
207 func (p *TSimpleServer) processRequests(client TTransport) error {
208 processor := p.processorFactory.GetProcessor(client)
209 inputTransport, err := p.inputTransportFactory.GetTransport(client)
210 if err != nil {
211 return err
212 }
213 inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
214 var outputTransport TTransport
215 var outputProtocol TProtocol
216
217 // for THeaderProtocol, we must use the same protocol instance for
218 // input and output so that the response is in the same dialect that
219 // the server detected the request was in.
220 headerProtocol, ok := inputProtocol.(*THeaderProtocol)
221 if ok {
222 outputProtocol = inputProtocol
223 } else {
224 oTrans, err := p.outputTransportFactory.GetTransport(client)
225 if err != nil {
226 return err
227 }
228 outputTransport = oTrans
229 outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport)
230 }
231
232 defer func() {
233 if e := recover(); e != nil {
234 log.Printf("panic in processor: %s: %s", e, debug.Stack())
235 }
236 }()
237
238 if inputTransport != nil {
239 defer inputTransport.Close()
240 }
241 if outputTransport != nil {
242 defer outputTransport.Close()
243 }
244 for {
245 if atomic.LoadInt32(&p.closed) != 0 {
246 return nil
247 }
248
249 ctx := defaultCtx
250 if headerProtocol != nil {
251 // We need to call ReadFrame here, otherwise we won't
252 // get any headers on the AddReadTHeaderToContext call.
253 //
254 // ReadFrame is safe to be called multiple times so it
255 // won't break when it's called again later when we
256 // actually start to read the message.
257 if err := headerProtocol.ReadFrame(); err != nil {
258 return err
259 }
260 ctx = AddReadTHeaderToContext(defaultCtx, headerProtocol.GetReadHeaders())
261 ctx = SetWriteHeaderList(ctx, p.forwardHeaders)
262 }
263
264 ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
265 if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
266 return nil
267 } else if err != nil {
268 return err
269 }
270 if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
271 continue
272 }
273 if !ok {
274 break
275 }
276 }
277 return nil
278 }