// Licensed to the Apache Software Foundation(ASF) under one // or more contributor license agreements.See the NOTICE file // distributed with this work for additional information // regarding copyright ownership.The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. using System; using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; namespace Thrift.Transports.Server { // ReSharper disable once InconsistentNaming public class TNamedPipeServerTransport : TServerTransport { /// /// This is the address of the Pipe on the localhost. /// private readonly string _pipeAddress; private bool _asyncMode = true; private volatile bool _isPending = true; private NamedPipeServerStream _stream = null; public TNamedPipeServerTransport(string pipeAddress) { _pipeAddress = pipeAddress; } public override void Listen() { // nothing to do here } public override void Close() { if (_stream != null) { try { //TODO: check for disconection _stream.Disconnect(); _stream.Dispose(); } finally { _stream = null; _isPending = false; } } } public override bool IsClientPending() { return _isPending; } private void EnsurePipeInstance() { if (_stream == null) { var direction = PipeDirection.InOut; var maxconn = 254; var mode = PipeTransmissionMode.Byte; var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; var inbuf = 4096; var outbuf = 4096; // TODO: security try { _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); } catch (NotImplementedException) // Mono still does not support async, fallback to sync { if (_asyncMode) { options &= (~PipeOptions.Asynchronous); _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); _asyncMode = false; } else { throw; } } } } protected override async Task AcceptImplementationAsync(CancellationToken cancellationToken) { try { EnsurePipeInstance(); await _stream.WaitForConnectionAsync(cancellationToken); var trans = new ServerTransport(_stream); _stream = null; // pass ownership to ServerTransport //_isPending = false; return trans; } catch (TTransportException) { Close(); throw; } catch (Exception e) { Close(); throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message); } } private class ServerTransport : TClientTransport { private readonly NamedPipeServerStream _stream; public ServerTransport(NamedPipeServerStream stream) { _stream = stream; } public override bool IsOpen => _stream != null && _stream.IsConnected; public override async Task OpenAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { await Task.FromCanceled(cancellationToken); } } public override void Close() { _stream?.Dispose(); } public override async Task ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { if (_stream == null) { throw new TTransportException(TTransportException.ExceptionType.NotOpen); } return await _stream.ReadAsync(buffer, offset, length, cancellationToken); } public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { if (_stream == null) { throw new TTransportException(TTransportException.ExceptionType.NotOpen); } await _stream.WriteAsync(buffer, offset, length, cancellationToken); } public override async Task FlushAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { await Task.FromCanceled(cancellationToken); } } protected override void Dispose(bool disposing) { _stream?.Dispose(); } } } }