2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
29 TMultiplexedProtocol is a protocol-independent concrete decorator
30 that allows a Thrift client to communicate with a multiplexing Thrift server,
31 by prepending the service name to the function name during function calls.
33 NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request
34 from a multiplexing client.
36 This example uses a single socket transport to invoke two services:
38 socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT)
39 transport := thrift.NewTFramedTransport(socket)
40 protocol := thrift.NewTBinaryProtocolTransport(transport)
42 mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator")
43 service := Calculator.NewCalculatorClient(mp)
45 mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport")
46 service2 := WeatherReport.NewWeatherReportClient(mp2)
48 err := transport.Open()
50 t.Fatal("Unable to open client socket", err)
53 fmt.Println(service.Add(2,2))
54 fmt.Println(service2.GetTemperature())
57 type TMultiplexedProtocol struct {
62 const MULTIPLEXED_SEPARATOR = ":"
64 func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol {
65 return &TMultiplexedProtocol{
67 serviceName: serviceName,
71 func (t *TMultiplexedProtocol) WriteMessageBegin(name string, typeId TMessageType, seqid int32) error {
72 if typeId == CALL || typeId == ONEWAY {
73 return t.TProtocol.WriteMessageBegin(t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid)
75 return t.TProtocol.WriteMessageBegin(name, typeId, seqid)
80 TMultiplexedProcessor is a TProcessor allowing
81 a single TServer to provide multiple services.
83 To do so, you instantiate the processor and then register additional
84 processors with it, as shown in the following example:
86 var processor = thrift.NewTMultiplexedProcessor()
89 processor.RegisterProcessor("FirstService", firstProcessor)
91 processor.registerProcessor(
93 Calculator.NewCalculatorProcessor(&CalculatorHandler{}),
96 processor.registerProcessor(
98 WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}),
101 serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT)
103 t.Fatal("Unable to create server socket", err)
105 server := thrift.NewTSimpleServer2(processor, serverTransport)
109 type TMultiplexedProcessor struct {
110 serviceProcessorMap map[string]TProcessor
111 DefaultProcessor TProcessor
114 func NewTMultiplexedProcessor() *TMultiplexedProcessor {
115 return &TMultiplexedProcessor{
116 serviceProcessorMap: make(map[string]TProcessor),
120 func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
121 t.DefaultProcessor = processor
124 func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) {
125 if t.serviceProcessorMap == nil {
126 t.serviceProcessorMap = make(map[string]TProcessor)
128 t.serviceProcessorMap[name] = processor
131 func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
132 name, typeId, seqid, err := in.ReadMessageBegin()
136 if typeId != CALL && typeId != ONEWAY {
137 return false, fmt.Errorf("Unexpected message type %v", typeId)
139 //extract the service name
140 v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
142 if t.DefaultProcessor != nil {
143 smb := NewStoredMessageProtocol(in, name, typeId, seqid)
144 return t.DefaultProcessor.Process(ctx, smb, out)
146 return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
148 actualProcessor, ok := t.serviceProcessorMap[v[0]]
150 return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
152 smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
153 return actualProcessor.Process(ctx, smb, out)
156 //Protocol that use stored message for ReadMessageBegin
157 type storedMessageProtocol struct {
164 func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol {
165 return &storedMessageProtocol{protocol, name, typeId, seqid}
168 func (s *storedMessageProtocol) ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error) {
169 return s.name, s.typeId, s.seqid, nil