]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/go/thrift/multiplexed_protocol.go
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / go / thrift / multiplexed_protocol.go
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package thrift
21
22 import (
23 "context"
24 "fmt"
25 "strings"
26 )
27
28 /*
29 TMultiplexedProtocol is a protocol-independent concrete decorator
30 that allows a Thrift client to communicate with a multiplexing Thrift server,
31 by prepending the service name to the function name during function calls.
32
33 NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request
34 from a multiplexing client.
35
36 This example uses a single socket transport to invoke two services:
37
38 socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT)
39 transport := thrift.NewTFramedTransport(socket)
40 protocol := thrift.NewTBinaryProtocolTransport(transport)
41
42 mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator")
43 service := Calculator.NewCalculatorClient(mp)
44
45 mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport")
46 service2 := WeatherReport.NewWeatherReportClient(mp2)
47
48 err := transport.Open()
49 if err != nil {
50 t.Fatal("Unable to open client socket", err)
51 }
52
53 fmt.Println(service.Add(2,2))
54 fmt.Println(service2.GetTemperature())
55 */
56
57 type TMultiplexedProtocol struct {
58 TProtocol
59 serviceName string
60 }
61
62 const MULTIPLEXED_SEPARATOR = ":"
63
64 func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol {
65 return &TMultiplexedProtocol{
66 TProtocol: protocol,
67 serviceName: serviceName,
68 }
69 }
70
71 func (t *TMultiplexedProtocol) WriteMessageBegin(name string, typeId TMessageType, seqid int32) error {
72 if typeId == CALL || typeId == ONEWAY {
73 return t.TProtocol.WriteMessageBegin(t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid)
74 } else {
75 return t.TProtocol.WriteMessageBegin(name, typeId, seqid)
76 }
77 }
78
79 /*
80 TMultiplexedProcessor is a TProcessor allowing
81 a single TServer to provide multiple services.
82
83 To do so, you instantiate the processor and then register additional
84 processors with it, as shown in the following example:
85
86 var processor = thrift.NewTMultiplexedProcessor()
87
88 firstProcessor :=
89 processor.RegisterProcessor("FirstService", firstProcessor)
90
91 processor.registerProcessor(
92 "Calculator",
93 Calculator.NewCalculatorProcessor(&CalculatorHandler{}),
94 )
95
96 processor.registerProcessor(
97 "WeatherReport",
98 WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}),
99 )
100
101 serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT)
102 if err != nil {
103 t.Fatal("Unable to create server socket", err)
104 }
105 server := thrift.NewTSimpleServer2(processor, serverTransport)
106 server.Serve();
107 */
108
109 type TMultiplexedProcessor struct {
110 serviceProcessorMap map[string]TProcessor
111 DefaultProcessor TProcessor
112 }
113
114 func NewTMultiplexedProcessor() *TMultiplexedProcessor {
115 return &TMultiplexedProcessor{
116 serviceProcessorMap: make(map[string]TProcessor),
117 }
118 }
119
120 func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
121 t.DefaultProcessor = processor
122 }
123
124 func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) {
125 if t.serviceProcessorMap == nil {
126 t.serviceProcessorMap = make(map[string]TProcessor)
127 }
128 t.serviceProcessorMap[name] = processor
129 }
130
131 func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
132 name, typeId, seqid, err := in.ReadMessageBegin()
133 if err != nil {
134 return false, err
135 }
136 if typeId != CALL && typeId != ONEWAY {
137 return false, fmt.Errorf("Unexpected message type %v", typeId)
138 }
139 //extract the service name
140 v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
141 if len(v) != 2 {
142 if t.DefaultProcessor != nil {
143 smb := NewStoredMessageProtocol(in, name, typeId, seqid)
144 return t.DefaultProcessor.Process(ctx, smb, out)
145 }
146 return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
147 }
148 actualProcessor, ok := t.serviceProcessorMap[v[0]]
149 if !ok {
150 return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
151 }
152 smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
153 return actualProcessor.Process(ctx, smb, out)
154 }
155
156 //Protocol that use stored message for ReadMessageBegin
157 type storedMessageProtocol struct {
158 TProtocol
159 name string
160 typeId TMessageType
161 seqid int32
162 }
163
164 func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol {
165 return &storedMessageProtocol{protocol, name, typeId, seqid}
166 }
167
168 func (s *storedMessageProtocol) ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error) {
169 return s.name, s.typeId, s.seqid, nil
170 }