]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Licensed to the Apache Software Foundation(ASF) under one |
2 | // or more contributor license agreements.See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership.The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | using System; | |
19 | using System.Collections.Generic; | |
20 | using System.IO; | |
21 | using System.Threading; | |
22 | using System.Threading.Tasks; | |
23 | using Thrift.Protocol; | |
24 | using Thrift.Protocol.Entities; | |
25 | ||
26 | namespace Thrift.Processor | |
27 | { | |
28 | // ReSharper disable once InconsistentNaming | |
29 | public class TMultiplexedProcessor : ITAsyncProcessor | |
30 | { | |
31 | //TODO: Localization | |
32 | ||
33 | private readonly Dictionary<string, ITAsyncProcessor> _serviceProcessorMap = | |
34 | new Dictionary<string, ITAsyncProcessor>(); | |
35 | ||
36 | public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot) | |
37 | { | |
38 | return await ProcessAsync(iprot, oprot, CancellationToken.None); | |
39 | } | |
40 | ||
41 | public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken) | |
42 | { | |
43 | if (cancellationToken.IsCancellationRequested) | |
44 | { | |
45 | return await Task.FromCanceled<bool>(cancellationToken); | |
46 | } | |
47 | ||
48 | try | |
49 | { | |
50 | var message = await iprot.ReadMessageBeginAsync(cancellationToken); | |
51 | ||
52 | if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway)) | |
53 | { | |
54 | await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType, | |
55 | "Message exType CALL or ONEWAY expected", cancellationToken); | |
56 | return false; | |
57 | } | |
58 | ||
59 | // Extract the service name | |
60 | var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal); | |
61 | if (index < 0) | |
62 | { | |
63 | await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol, | |
64 | $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?", | |
65 | cancellationToken); | |
66 | return false; | |
67 | } | |
68 | ||
69 | // Create a new TMessage, something that can be consumed by any TProtocol | |
70 | var serviceName = message.Name.Substring(0, index); | |
71 | ITAsyncProcessor actualProcessor; | |
72 | if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor)) | |
73 | { | |
74 | await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError, | |
75 | $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?", | |
76 | cancellationToken); | |
77 | return false; | |
78 | } | |
79 | ||
80 | // Create a new TMessage, removing the service name | |
81 | var newMessage = new TMessage( | |
82 | message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length), | |
83 | message.Type, | |
84 | message.SeqID); | |
85 | ||
86 | // Dispatch processing to the stored processor | |
87 | return | |
88 | await | |
89 | actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot, | |
90 | cancellationToken); | |
91 | } | |
92 | catch (IOException) | |
93 | { | |
94 | return false; // similar to all other processors | |
95 | } | |
96 | } | |
97 | ||
98 | public void RegisterProcessor(string serviceName, ITAsyncProcessor processor) | |
99 | { | |
100 | if (_serviceProcessorMap.ContainsKey(serviceName)) | |
101 | { | |
102 | throw new InvalidOperationException( | |
103 | $"Processor map already contains processor with name: '{serviceName}'"); | |
104 | } | |
105 | ||
106 | _serviceProcessorMap.Add(serviceName, processor); | |
107 | } | |
108 | ||
109 | private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype, | |
110 | string etxt, CancellationToken cancellationToken) | |
111 | { | |
112 | var appex = new TApplicationException(extype, etxt); | |
113 | ||
114 | var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID); | |
115 | ||
116 | await oprot.WriteMessageBeginAsync(newMessage, cancellationToken); | |
117 | await appex.WriteAsync(oprot, cancellationToken); | |
118 | await oprot.WriteMessageEndAsync(cancellationToken); | |
119 | await oprot.Transport.FlushAsync(cancellationToken); | |
120 | } | |
121 | ||
122 | private class StoredMessageProtocol : TProtocolDecorator | |
123 | { | |
124 | readonly TMessage _msgBegin; | |
125 | ||
126 | public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) | |
127 | : base(protocol) | |
128 | { | |
129 | _msgBegin = messageBegin; | |
130 | } | |
131 | ||
132 | public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) | |
133 | { | |
134 | if (cancellationToken.IsCancellationRequested) | |
135 | { | |
136 | return await Task.FromCanceled<TMessage>(cancellationToken); | |
137 | } | |
138 | ||
139 | return _msgBegin; | |
140 | } | |
141 | } | |
142 | } | |
143 | } |