]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netstd / Thrift / Processor / TMultiplexedProcessor.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Collections.Generic;
20using System.IO;
21using System.Threading;
22using System.Threading.Tasks;
23using Thrift.Protocol;
24using Thrift.Protocol.Entities;
25
26namespace Thrift.Processor
27{
28 // ReSharper disable once InconsistentNaming
29 public class TMultiplexedProcessor : ITAsyncProcessor
30 {
31 //TODO: Localization
32
33 private readonly Dictionary<string, ITAsyncProcessor> _serviceProcessorMap =
34 new Dictionary<string, ITAsyncProcessor>();
35
36 public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot)
37 {
38 return await ProcessAsync(iprot, oprot, CancellationToken.None);
39 }
40
41 public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken)
42 {
43 if (cancellationToken.IsCancellationRequested)
44 {
45 return await Task.FromCanceled<bool>(cancellationToken);
46 }
47
48 try
49 {
50 var message = await iprot.ReadMessageBeginAsync(cancellationToken);
51
52 if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway))
53 {
54 await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType,
55 "Message exType CALL or ONEWAY expected", cancellationToken);
56 return false;
57 }
58
59 // Extract the service name
60 var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal);
61 if (index < 0)
62 {
63 await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol,
64 $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?",
65 cancellationToken);
66 return false;
67 }
68
69 // Create a new TMessage, something that can be consumed by any TProtocol
70 var serviceName = message.Name.Substring(0, index);
71 ITAsyncProcessor actualProcessor;
72 if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor))
73 {
74 await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError,
75 $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?",
76 cancellationToken);
77 return false;
78 }
79
80 // Create a new TMessage, removing the service name
81 var newMessage = new TMessage(
82 message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length),
83 message.Type,
84 message.SeqID);
85
86 // Dispatch processing to the stored processor
87 return
88 await
89 actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot,
90 cancellationToken);
91 }
92 catch (IOException)
93 {
94 return false; // similar to all other processors
95 }
96 }
97
98 public void RegisterProcessor(string serviceName, ITAsyncProcessor processor)
99 {
100 if (_serviceProcessorMap.ContainsKey(serviceName))
101 {
102 throw new InvalidOperationException(
103 $"Processor map already contains processor with name: '{serviceName}'");
104 }
105
106 _serviceProcessorMap.Add(serviceName, processor);
107 }
108
109 private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype,
110 string etxt, CancellationToken cancellationToken)
111 {
112 var appex = new TApplicationException(extype, etxt);
113
114 var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID);
115
116 await oprot.WriteMessageBeginAsync(newMessage, cancellationToken);
117 await appex.WriteAsync(oprot, cancellationToken);
118 await oprot.WriteMessageEndAsync(cancellationToken);
119 await oprot.Transport.FlushAsync(cancellationToken);
120 }
121
122 private class StoredMessageProtocol : TProtocolDecorator
123 {
124 readonly TMessage _msgBegin;
125
126 public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin)
127 : base(protocol)
128 {
129 _msgBegin = messageBegin;
130 }
131
132 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
133 {
134 if (cancellationToken.IsCancellationRequested)
135 {
136 return await Task.FromCanceled<TMessage>(cancellationToken);
137 }
138
139 return _msgBegin;
140 }
141 }
142 }
143}