2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
30 * This is not a typical TSimpleServer as it is not blocked after accept a socket.
31 * It is more like a TThreadedServer that can handle different connections in different goroutines.
32 * This will work if golang user implements a conn-pool like thing in client side.
34 type TSimpleServer struct {
39 processorFactory TProcessorFactory
40 serverTransport TServerTransport
41 inputTransportFactory TTransportFactory
42 outputTransportFactory TTransportFactory
43 inputProtocolFactory TProtocolFactory
44 outputProtocolFactory TProtocolFactory
46 // Headers to auto forward in THeaderProtocol
47 forwardHeaders []string
50 func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer {
51 return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport)
54 func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
55 return NewTSimpleServerFactory4(NewTProcessorFactory(processor),
62 func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
63 return NewTSimpleServerFactory6(NewTProcessorFactory(processor),
65 inputTransportFactory,
66 outputTransportFactory,
68 outputProtocolFactory,
72 func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer {
73 return NewTSimpleServerFactory6(processorFactory,
75 NewTTransportFactory(),
76 NewTTransportFactory(),
77 NewTBinaryProtocolFactoryDefault(),
78 NewTBinaryProtocolFactoryDefault(),
82 func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
83 return NewTSimpleServerFactory6(processorFactory,
92 func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
93 return &TSimpleServer{
94 processorFactory: processorFactory,
95 serverTransport: serverTransport,
96 inputTransportFactory: inputTransportFactory,
97 outputTransportFactory: outputTransportFactory,
98 inputProtocolFactory: inputProtocolFactory,
99 outputProtocolFactory: outputProtocolFactory,
103 func (p *TSimpleServer) ProcessorFactory() TProcessorFactory {
104 return p.processorFactory
107 func (p *TSimpleServer) ServerTransport() TServerTransport {
108 return p.serverTransport
111 func (p *TSimpleServer) InputTransportFactory() TTransportFactory {
112 return p.inputTransportFactory
115 func (p *TSimpleServer) OutputTransportFactory() TTransportFactory {
116 return p.outputTransportFactory
119 func (p *TSimpleServer) InputProtocolFactory() TProtocolFactory {
120 return p.inputProtocolFactory
123 func (p *TSimpleServer) OutputProtocolFactory() TProtocolFactory {
124 return p.outputProtocolFactory
127 func (p *TSimpleServer) Listen() error {
128 return p.serverTransport.Listen()
131 // SetForwardHeaders sets the list of header keys that will be auto forwarded
132 // while using THeaderProtocol.
134 // "forward" means that when the server is also a client to other upstream
135 // thrift servers, the context object user gets in the processor functions will
136 // have both read and write headers set, with write headers being forwarded.
137 // Users can always override the write headers by calling SetWriteHeaderList
138 // before calling thrift client functions.
139 func (p *TSimpleServer) SetForwardHeaders(headers []string) {
142 p.forwardHeaders = nil
146 keys := make([]string, size)
148 p.forwardHeaders = keys
151 func (p *TSimpleServer) innerAccept() (int32, error) {
152 client, err := p.serverTransport.Accept()
155 closed := atomic.LoadInt32(&p.closed)
166 if err := p.processRequests(client); err != nil {
167 log.Println("error processing request:", err)
174 func (p *TSimpleServer) AcceptLoop() error {
176 closed, err := p.innerAccept()
186 func (p *TSimpleServer) Serve() error {
195 func (p *TSimpleServer) Stop() error {
198 if atomic.LoadInt32(&p.closed) != 0 {
201 atomic.StoreInt32(&p.closed, 1)
202 p.serverTransport.Interrupt()
207 func (p *TSimpleServer) processRequests(client TTransport) error {
208 processor := p.processorFactory.GetProcessor(client)
209 inputTransport, err := p.inputTransportFactory.GetTransport(client)
213 inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
214 var outputTransport TTransport
215 var outputProtocol TProtocol
217 // for THeaderProtocol, we must use the same protocol instance for
218 // input and output so that the response is in the same dialect that
219 // the server detected the request was in.
220 headerProtocol, ok := inputProtocol.(*THeaderProtocol)
222 outputProtocol = inputProtocol
224 oTrans, err := p.outputTransportFactory.GetTransport(client)
228 outputTransport = oTrans
229 outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport)
233 if e := recover(); e != nil {
234 log.Printf("panic in processor: %s: %s", e, debug.Stack())
238 if inputTransport != nil {
239 defer inputTransport.Close()
241 if outputTransport != nil {
242 defer outputTransport.Close()
245 if atomic.LoadInt32(&p.closed) != 0 {
250 if headerProtocol != nil {
251 // We need to call ReadFrame here, otherwise we won't
252 // get any headers on the AddReadTHeaderToContext call.
254 // ReadFrame is safe to be called multiple times so it
255 // won't break when it's called again later when we
256 // actually start to read the message.
257 if err := headerProtocol.ReadFrame(); err != nil {
260 ctx = AddReadTHeaderToContext(defaultCtx, headerProtocol.GetReadHeaders())
261 ctx = SetWriteHeaderList(ctx, p.forwardHeaders)
264 ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
265 if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
267 } else if err != nil {
270 if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {