1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
19 using System.Collections.Generic;
21 using System.Threading;
22 using System.Threading.Tasks;
23 using Thrift.Protocol;
24 using Thrift.Protocol.Entities;
26 namespace Thrift.Processor
28 // ReSharper disable once InconsistentNaming
29 public class TMultiplexedProcessor : ITAsyncProcessor
33 private readonly Dictionary<string, ITAsyncProcessor> _serviceProcessorMap =
34 new Dictionary<string, ITAsyncProcessor>();
36 public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot)
38 return await ProcessAsync(iprot, oprot, CancellationToken.None);
41 public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken)
43 if (cancellationToken.IsCancellationRequested)
45 return await Task.FromCanceled<bool>(cancellationToken);
50 var message = await iprot.ReadMessageBeginAsync(cancellationToken);
52 if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway))
54 await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType,
55 "Message exType CALL or ONEWAY expected", cancellationToken);
59 // Extract the service name
60 var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal);
63 await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol,
64 $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?",
69 // Create a new TMessage, something that can be consumed by any TProtocol
70 var serviceName = message.Name.Substring(0, index);
71 ITAsyncProcessor actualProcessor;
72 if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor))
74 await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError,
75 $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?",
80 // Create a new TMessage, removing the service name
81 var newMessage = new TMessage(
82 message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length),
86 // Dispatch processing to the stored processor
89 actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot,
94 return false; // similar to all other processors
98 public void RegisterProcessor(string serviceName, ITAsyncProcessor processor)
100 if (_serviceProcessorMap.ContainsKey(serviceName))
102 throw new InvalidOperationException(
103 $"Processor map already contains processor with name: '{serviceName}'");
106 _serviceProcessorMap.Add(serviceName, processor);
109 private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype,
110 string etxt, CancellationToken cancellationToken)
112 var appex = new TApplicationException(extype, etxt);
114 var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID);
116 await oprot.WriteMessageBeginAsync(newMessage, cancellationToken);
117 await appex.WriteAsync(oprot, cancellationToken);
118 await oprot.WriteMessageEndAsync(cancellationToken);
119 await oprot.Transport.FlushAsync(cancellationToken);
122 private class StoredMessageProtocol : TProtocolDecorator
124 readonly TMessage _msgBegin;
126 public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin)
129 _msgBegin = messageBegin;
132 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
134 if (cancellationToken.IsCancellationRequested)
136 return await Task.FromCanceled<TMessage>(cancellationToken);