// Licensed to the Apache Software Foundation(ASF) under one // or more contributor license agreements.See the NOTICE file // distributed with this work for additional information // regarding copyright ownership.The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. using System; using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Tasks; using Thrift.Protocol; using Thrift.Protocol.Entities; namespace Thrift.Processor { // ReSharper disable once InconsistentNaming public class TMultiplexedProcessor : ITAsyncProcessor { //TODO: Localization private readonly Dictionary _serviceProcessorMap = new Dictionary(); public async Task ProcessAsync(TProtocol iprot, TProtocol oprot) { return await ProcessAsync(iprot, oprot, CancellationToken.None); } public async Task ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return await Task.FromCanceled(cancellationToken); } try { var message = await iprot.ReadMessageBeginAsync(cancellationToken); if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway)) { await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType, "Message exType CALL or ONEWAY expected", cancellationToken); return false; } // Extract the service name var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal); if (index < 0) { await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol, $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?", cancellationToken); return false; } // Create a new TMessage, something that can be consumed by any TProtocol var serviceName = message.Name.Substring(0, index); ITAsyncProcessor actualProcessor; if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor)) { await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError, $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?", cancellationToken); return false; } // Create a new TMessage, removing the service name var newMessage = new TMessage( message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length), message.Type, message.SeqID); // Dispatch processing to the stored processor return await actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot, cancellationToken); } catch (IOException) { return false; // similar to all other processors } } public void RegisterProcessor(string serviceName, ITAsyncProcessor processor) { if (_serviceProcessorMap.ContainsKey(serviceName)) { throw new InvalidOperationException( $"Processor map already contains processor with name: '{serviceName}'"); } _serviceProcessorMap.Add(serviceName, processor); } private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype, string etxt, CancellationToken cancellationToken) { var appex = new TApplicationException(extype, etxt); var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID); await oprot.WriteMessageBeginAsync(newMessage, cancellationToken); await appex.WriteAsync(oprot, cancellationToken); await oprot.WriteMessageEndAsync(cancellationToken); await oprot.Transport.FlushAsync(cancellationToken); } private class StoredMessageProtocol : TProtocolDecorator { readonly TMessage _msgBegin; public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) : base(protocol) { _msgBegin = messageBegin; } public override async ValueTask ReadMessageBeginAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return await Task.FromCanceled(cancellationToken); } return _msgBegin; } } } }