]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netcore / Thrift / Transports / Server / TNamedPipeServerTransport.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.IO.Pipes;
20using System.Threading;
21using System.Threading.Tasks;
22
23namespace Thrift.Transports.Server
24{
25 // ReSharper disable once InconsistentNaming
26 public class TNamedPipeServerTransport : TServerTransport
27 {
28 /// <summary>
29 /// This is the address of the Pipe on the localhost.
30 /// </summary>
31 private readonly string _pipeAddress;
32
33 private bool _asyncMode = true;
34 private volatile bool _isPending = true;
35
36 private NamedPipeServerStream _stream = null;
37
38 public TNamedPipeServerTransport(string pipeAddress)
39 {
40 _pipeAddress = pipeAddress;
41 }
42
43 public override void Listen()
44 {
45 // nothing to do here
46 }
47
48 public override void Close()
49 {
50 if (_stream != null)
51 {
52 try
53 {
54 //TODO: check for disconection
55 _stream.Disconnect();
56 _stream.Dispose();
57 }
58 finally
59 {
60 _stream = null;
61 _isPending = false;
62 }
63 }
64 }
65
66 public override bool IsClientPending()
67 {
68 return _isPending;
69 }
70
71 private void EnsurePipeInstance()
72 {
73 if (_stream == null)
74 {
75 var direction = PipeDirection.InOut;
76 var maxconn = 254;
77 var mode = PipeTransmissionMode.Byte;
78 var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
79 var inbuf = 4096;
80 var outbuf = 4096;
81 // TODO: security
82
83 try
84 {
85 _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
86 }
87 catch (NotImplementedException) // Mono still does not support async, fallback to sync
88 {
89 if (_asyncMode)
90 {
91 options &= (~PipeOptions.Asynchronous);
92 _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf,
93 outbuf);
94 _asyncMode = false;
95 }
96 else
97 {
98 throw;
99 }
100 }
101 }
102 }
103
104 protected override async Task<TClientTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
105 {
106 try
107 {
108 EnsurePipeInstance();
109
110 await _stream.WaitForConnectionAsync(cancellationToken);
111
112 var trans = new ServerTransport(_stream);
113 _stream = null; // pass ownership to ServerTransport
114
115 //_isPending = false;
116
117 return trans;
118 }
119 catch (TTransportException)
120 {
121 Close();
122 throw;
123 }
124 catch (Exception e)
125 {
126 Close();
127 throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
128 }
129 }
130
131 private class ServerTransport : TClientTransport
132 {
133 private readonly NamedPipeServerStream _stream;
134
135 public ServerTransport(NamedPipeServerStream stream)
136 {
137 _stream = stream;
138 }
139
140 public override bool IsOpen => _stream != null && _stream.IsConnected;
141
142 public override async Task OpenAsync(CancellationToken cancellationToken)
143 {
144 if (cancellationToken.IsCancellationRequested)
145 {
146 await Task.FromCanceled(cancellationToken);
147 }
148 }
149
150 public override void Close()
151 {
152 _stream?.Dispose();
153 }
154
155 public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
156 CancellationToken cancellationToken)
157 {
158 if (_stream == null)
159 {
160 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
161 }
162
163 return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
164 }
165
166 public override async Task WriteAsync(byte[] buffer, int offset, int length,
167 CancellationToken cancellationToken)
168 {
169 if (_stream == null)
170 {
171 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
172 }
173
174 await _stream.WriteAsync(buffer, offset, length, cancellationToken);
175 }
176
177 public override async Task FlushAsync(CancellationToken cancellationToken)
178 {
179 if (cancellationToken.IsCancellationRequested)
180 {
181 await Task.FromCanceled(cancellationToken);
182 }
183 }
184
185 protected override void Dispose(bool disposing)
186 {
187 _stream?.Dispose();
188 }
189 }
190 }
191}