]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /** |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | * | |
19 | * Contains some contributions under the Thrift Software License. | |
20 | * Please see doc/old-thrift-license.txt in the Thrift distribution for | |
21 | * details. | |
22 | */ | |
23 | ||
24 | using System; | |
25 | using System.IO.Pipes; | |
26 | using System.Threading; | |
27 | using System.Security.Principal; | |
28 | ||
29 | namespace Thrift.Transport | |
30 | { | |
31 | public class TNamedPipeServerTransport : TServerTransport | |
32 | { | |
33 | /// <summary> | |
34 | /// This is the address of the Pipe on the localhost. | |
35 | /// </summary> | |
36 | private readonly string pipeAddress; | |
37 | private NamedPipeServerStream stream = null; | |
38 | private bool asyncMode = true; | |
39 | ||
40 | public TNamedPipeServerTransport(string pipeAddress) | |
41 | { | |
42 | this.pipeAddress = pipeAddress; | |
43 | } | |
44 | ||
45 | public override void Listen() | |
46 | { | |
47 | // nothing to do here | |
48 | } | |
49 | ||
50 | public override void Close() | |
51 | { | |
52 | if (stream != null) | |
53 | { | |
54 | try | |
55 | { | |
56 | stream.Close(); | |
57 | stream.Dispose(); | |
58 | } | |
59 | finally | |
60 | { | |
61 | stream = null; | |
62 | } | |
63 | } | |
64 | } | |
65 | ||
66 | private void EnsurePipeInstance() | |
67 | { | |
68 | if (stream == null) | |
69 | { | |
70 | var direction = PipeDirection.InOut; | |
71 | var maxconn = NamedPipeServerStream.MaxAllowedServerInstances; | |
72 | var mode = PipeTransmissionMode.Byte; | |
73 | var options = asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; | |
74 | const int INBUF_SIZE = 4096; | |
75 | const int OUTBUF_SIZE = 4096; | |
76 | ||
77 | // security | |
78 | var security = new PipeSecurity(); | |
79 | security.AddAccessRule( | |
80 | new PipeAccessRule( | |
81 | new SecurityIdentifier(WellKnownSidType.WorldSid, null), | |
82 | PipeAccessRights.Read | PipeAccessRights.Write | PipeAccessRights.Synchronize | PipeAccessRights.CreateNewInstance, | |
83 | System.Security.AccessControl.AccessControlType.Allow | |
84 | ) | |
85 | ); | |
86 | ||
87 | try | |
88 | { | |
89 | stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, INBUF_SIZE, OUTBUF_SIZE, security); | |
90 | } | |
91 | catch (NotImplementedException) // Mono still does not support async, fallback to sync | |
92 | { | |
93 | if (asyncMode) | |
94 | { | |
95 | options &= (~PipeOptions.Asynchronous); | |
96 | stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, INBUF_SIZE, OUTBUF_SIZE, security); | |
97 | asyncMode = false; | |
98 | } | |
99 | else | |
100 | { | |
101 | throw; | |
102 | } | |
103 | } | |
104 | ||
105 | } | |
106 | } | |
107 | ||
108 | protected override TTransport AcceptImpl() | |
109 | { | |
110 | try | |
111 | { | |
112 | EnsurePipeInstance(); | |
113 | ||
114 | if (asyncMode) | |
115 | { | |
116 | var evt = new ManualResetEvent(false); | |
117 | Exception eOuter = null; | |
118 | ||
119 | stream.BeginWaitForConnection(asyncResult => | |
120 | { | |
121 | try | |
122 | { | |
123 | if (stream != null) | |
124 | stream.EndWaitForConnection(asyncResult); | |
125 | else | |
126 | eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted); | |
127 | } | |
128 | catch (Exception e) | |
129 | { | |
130 | if (stream != null) | |
131 | eOuter = e; | |
132 | else | |
133 | eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message, e); | |
134 | } | |
135 | evt.Set(); | |
136 | }, null); | |
137 | ||
138 | evt.WaitOne(); | |
139 | ||
140 | if (eOuter != null) | |
141 | throw eOuter; // rethrow exception | |
142 | } | |
143 | else | |
144 | { | |
145 | stream.WaitForConnection(); | |
146 | } | |
147 | ||
148 | var trans = new ServerTransport(stream,asyncMode); | |
149 | stream = null; // pass ownership to ServerTransport | |
150 | return trans; | |
151 | } | |
152 | catch (TTransportException) | |
153 | { | |
154 | Close(); | |
155 | throw; | |
156 | } | |
157 | catch (Exception e) | |
158 | { | |
159 | Close(); | |
160 | throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message, e); | |
161 | } | |
162 | } | |
163 | ||
164 | private class ServerTransport : TTransport | |
165 | { | |
166 | private NamedPipeServerStream stream; | |
167 | private bool asyncMode; | |
168 | ||
169 | public ServerTransport(NamedPipeServerStream stream, bool asyncMode) | |
170 | { | |
171 | this.stream = stream; | |
172 | this.asyncMode = asyncMode; | |
173 | } | |
174 | ||
175 | public override bool IsOpen | |
176 | { | |
177 | get { return stream != null && stream.IsConnected; } | |
178 | } | |
179 | ||
180 | public override void Open() | |
181 | { | |
182 | } | |
183 | ||
184 | public override void Close() | |
185 | { | |
186 | if (stream != null) | |
187 | stream.Close(); | |
188 | } | |
189 | ||
190 | public override int Read(byte[] buf, int off, int len) | |
191 | { | |
192 | if (stream == null) | |
193 | { | |
194 | throw new TTransportException(TTransportException.ExceptionType.NotOpen); | |
195 | } | |
196 | ||
197 | if (asyncMode) | |
198 | { | |
199 | Exception eOuter = null; | |
200 | var evt = new ManualResetEvent(false); | |
201 | int retval = 0; | |
202 | ||
203 | stream.BeginRead(buf, off, len, asyncResult => | |
204 | { | |
205 | try | |
206 | { | |
207 | if (stream != null) | |
208 | retval = stream.EndRead(asyncResult); | |
209 | else | |
210 | eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted); | |
211 | } | |
212 | catch (Exception e) | |
213 | { | |
214 | if (stream != null) | |
215 | eOuter = e; | |
216 | else | |
217 | eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message, e); | |
218 | } | |
219 | evt.Set(); | |
220 | }, null); | |
221 | ||
222 | evt.WaitOne(); | |
223 | ||
224 | if (eOuter != null) | |
225 | throw eOuter; // rethrow exception | |
226 | else | |
227 | return retval; | |
228 | } | |
229 | else | |
230 | { | |
231 | return stream.Read(buf, off, len); | |
232 | } | |
233 | } | |
234 | ||
235 | public override void Write(byte[] buf, int off, int len) | |
236 | { | |
237 | if (stream == null) | |
238 | { | |
239 | throw new TTransportException(TTransportException.ExceptionType.NotOpen); | |
240 | } | |
241 | ||
242 | // if necessary, send the data in chunks | |
243 | // there's a system limit around 0x10000 bytes that we hit otherwise | |
244 | // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." | |
245 | var nBytes = Math.Min(len, 15 * 4096); // 16 would exceed the limit | |
246 | while (nBytes > 0) | |
247 | { | |
248 | ||
249 | if (asyncMode) | |
250 | { | |
251 | Exception eOuter = null; | |
252 | var evt = new ManualResetEvent(false); | |
253 | ||
254 | stream.BeginWrite(buf, off, nBytes, asyncResult => | |
255 | { | |
256 | try | |
257 | { | |
258 | if (stream != null) | |
259 | stream.EndWrite(asyncResult); | |
260 | else | |
261 | eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted); | |
262 | } | |
263 | catch (Exception e) | |
264 | { | |
265 | if (stream != null) | |
266 | eOuter = e; | |
267 | else | |
268 | eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message, e); | |
269 | } | |
270 | evt.Set(); | |
271 | }, null); | |
272 | ||
273 | evt.WaitOne(); | |
274 | ||
275 | if (eOuter != null) | |
276 | throw eOuter; // rethrow exception | |
277 | } | |
278 | else | |
279 | { | |
280 | stream.Write(buf, off, nBytes); | |
281 | } | |
282 | ||
283 | off += nBytes; | |
284 | len -= nBytes; | |
285 | nBytes = Math.Min(len, nBytes); | |
286 | } | |
287 | } | |
288 | ||
289 | protected override void Dispose(bool disposing) | |
290 | { | |
291 | if (stream != null) | |
292 | stream.Dispose(); | |
293 | } | |
294 | } | |
295 | } | |
296 | } |