2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
25 using System.IO.Pipes;
26 using System.Threading;
27 using System.Security.Principal;
29 namespace Thrift.Transport
31 public class TNamedPipeServerTransport : TServerTransport
34 /// This is the address of the Pipe on the localhost.
36 private readonly string pipeAddress;
37 private NamedPipeServerStream stream = null;
38 private bool asyncMode = true;
40 public TNamedPipeServerTransport(string pipeAddress)
42 this.pipeAddress = pipeAddress;
45 public override void Listen()
50 public override void Close()
66 private void EnsurePipeInstance()
70 var direction = PipeDirection.InOut;
71 var maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
72 var mode = PipeTransmissionMode.Byte;
73 var options = asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
74 const int INBUF_SIZE = 4096;
75 const int OUTBUF_SIZE = 4096;
78 var security = new PipeSecurity();
79 security.AddAccessRule(
81 new SecurityIdentifier(WellKnownSidType.WorldSid, null),
82 PipeAccessRights.Read | PipeAccessRights.Write | PipeAccessRights.Synchronize | PipeAccessRights.CreateNewInstance,
83 System.Security.AccessControl.AccessControlType.Allow
89 stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, INBUF_SIZE, OUTBUF_SIZE, security);
91 catch (NotImplementedException) // Mono still does not support async, fallback to sync
95 options &= (~PipeOptions.Asynchronous);
96 stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, INBUF_SIZE, OUTBUF_SIZE, security);
108 protected override TTransport AcceptImpl()
112 EnsurePipeInstance();
116 var evt = new ManualResetEvent(false);
117 Exception eOuter = null;
119 stream.BeginWaitForConnection(asyncResult =>
124 stream.EndWaitForConnection(asyncResult);
126 eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
133 eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message, e);
141 throw eOuter; // rethrow exception
145 stream.WaitForConnection();
148 var trans = new ServerTransport(stream,asyncMode);
149 stream = null; // pass ownership to ServerTransport
152 catch (TTransportException)
160 throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message, e);
164 private class ServerTransport : TTransport
166 private NamedPipeServerStream stream;
167 private bool asyncMode;
169 public ServerTransport(NamedPipeServerStream stream, bool asyncMode)
171 this.stream = stream;
172 this.asyncMode = asyncMode;
175 public override bool IsOpen
177 get { return stream != null && stream.IsConnected; }
180 public override void Open()
184 public override void Close()
190 public override int Read(byte[] buf, int off, int len)
194 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
199 Exception eOuter = null;
200 var evt = new ManualResetEvent(false);
203 stream.BeginRead(buf, off, len, asyncResult =>
208 retval = stream.EndRead(asyncResult);
210 eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
217 eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message, e);
225 throw eOuter; // rethrow exception
231 return stream.Read(buf, off, len);
235 public override void Write(byte[] buf, int off, int len)
239 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
242 // if necessary, send the data in chunks
243 // there's a system limit around 0x10000 bytes that we hit otherwise
244 // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
245 var nBytes = Math.Min(len, 15 * 4096); // 16 would exceed the limit
251 Exception eOuter = null;
252 var evt = new ManualResetEvent(false);
254 stream.BeginWrite(buf, off, nBytes, asyncResult =>
259 stream.EndWrite(asyncResult);
261 eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
268 eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message, e);
276 throw eOuter; // rethrow exception
280 stream.Write(buf, off, nBytes);
285 nBytes = Math.Min(len, nBytes);
289 protected override void Dispose(bool disposing)