]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netstd / Thrift / Processor / TMultiplexedProcessor.cs
1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Threading;
22 using System.Threading.Tasks;
23 using Thrift.Protocol;
24 using Thrift.Protocol.Entities;
25
26 namespace Thrift.Processor
27 {
28 // ReSharper disable once InconsistentNaming
29 public class TMultiplexedProcessor : ITAsyncProcessor
30 {
31 //TODO: Localization
32
33 private readonly Dictionary<string, ITAsyncProcessor> _serviceProcessorMap =
34 new Dictionary<string, ITAsyncProcessor>();
35
36 public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot)
37 {
38 return await ProcessAsync(iprot, oprot, CancellationToken.None);
39 }
40
41 public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken)
42 {
43 if (cancellationToken.IsCancellationRequested)
44 {
45 return await Task.FromCanceled<bool>(cancellationToken);
46 }
47
48 try
49 {
50 var message = await iprot.ReadMessageBeginAsync(cancellationToken);
51
52 if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway))
53 {
54 await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType,
55 "Message exType CALL or ONEWAY expected", cancellationToken);
56 return false;
57 }
58
59 // Extract the service name
60 var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal);
61 if (index < 0)
62 {
63 await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol,
64 $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?",
65 cancellationToken);
66 return false;
67 }
68
69 // Create a new TMessage, something that can be consumed by any TProtocol
70 var serviceName = message.Name.Substring(0, index);
71 ITAsyncProcessor actualProcessor;
72 if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor))
73 {
74 await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError,
75 $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?",
76 cancellationToken);
77 return false;
78 }
79
80 // Create a new TMessage, removing the service name
81 var newMessage = new TMessage(
82 message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length),
83 message.Type,
84 message.SeqID);
85
86 // Dispatch processing to the stored processor
87 return
88 await
89 actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot,
90 cancellationToken);
91 }
92 catch (IOException)
93 {
94 return false; // similar to all other processors
95 }
96 }
97
98 public void RegisterProcessor(string serviceName, ITAsyncProcessor processor)
99 {
100 if (_serviceProcessorMap.ContainsKey(serviceName))
101 {
102 throw new InvalidOperationException(
103 $"Processor map already contains processor with name: '{serviceName}'");
104 }
105
106 _serviceProcessorMap.Add(serviceName, processor);
107 }
108
109 private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype,
110 string etxt, CancellationToken cancellationToken)
111 {
112 var appex = new TApplicationException(extype, etxt);
113
114 var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID);
115
116 await oprot.WriteMessageBeginAsync(newMessage, cancellationToken);
117 await appex.WriteAsync(oprot, cancellationToken);
118 await oprot.WriteMessageEndAsync(cancellationToken);
119 await oprot.Transport.FlushAsync(cancellationToken);
120 }
121
122 private class StoredMessageProtocol : TProtocolDecorator
123 {
124 readonly TMessage _msgBegin;
125
126 public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin)
127 : base(protocol)
128 {
129 _msgBegin = messageBegin;
130 }
131
132 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
133 {
134 if (cancellationToken.IsCancellationRequested)
135 {
136 return await Task.FromCanceled<TMessage>(cancellationToken);
137 }
138
139 return _msgBegin;
140 }
141 }
142 }
143 }