1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
19 using System.IO.Pipes;
20 using System.Threading;
21 using System.Threading.Tasks;
23 namespace Thrift.Transports.Server
25 // ReSharper disable once InconsistentNaming
26 public class TNamedPipeServerTransport : TServerTransport
29 /// This is the address of the Pipe on the localhost.
31 private readonly string _pipeAddress;
33 private bool _asyncMode = true;
34 private volatile bool _isPending = true;
36 private NamedPipeServerStream _stream = null;
38 public TNamedPipeServerTransport(string pipeAddress)
40 _pipeAddress = pipeAddress;
43 public override void Listen()
48 public override void Close()
54 //TODO: check for disconection
66 public override bool IsClientPending()
71 private void EnsurePipeInstance()
75 var direction = PipeDirection.InOut;
77 var mode = PipeTransmissionMode.Byte;
78 var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
85 _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
87 catch (NotImplementedException) // Mono still does not support async, fallback to sync
91 options &= (~PipeOptions.Asynchronous);
92 _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf,
104 protected override async Task<TClientTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
108 EnsurePipeInstance();
110 await _stream.WaitForConnectionAsync(cancellationToken);
112 var trans = new ServerTransport(_stream);
113 _stream = null; // pass ownership to ServerTransport
115 //_isPending = false;
119 catch (TTransportException)
127 throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
131 private class ServerTransport : TClientTransport
133 private readonly NamedPipeServerStream _stream;
135 public ServerTransport(NamedPipeServerStream stream)
140 public override bool IsOpen => _stream != null && _stream.IsConnected;
142 public override async Task OpenAsync(CancellationToken cancellationToken)
144 if (cancellationToken.IsCancellationRequested)
146 await Task.FromCanceled(cancellationToken);
150 public override void Close()
155 public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
156 CancellationToken cancellationToken)
160 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
163 return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
166 public override async Task WriteAsync(byte[] buffer, int offset, int length,
167 CancellationToken cancellationToken)
171 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
174 await _stream.WriteAsync(buffer, offset, length, cancellationToken);
177 public override async Task FlushAsync(CancellationToken cancellationToken)
179 if (cancellationToken.IsCancellationRequested)
181 await Task.FromCanceled(cancellationToken);
185 protected override void Dispose(bool disposing)