]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Licensed to the Apache Software Foundation(ASF) under one |
2 | // or more contributor license agreements.See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership.The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | using System; | |
19 | using System.Text; | |
20 | using System.Threading; | |
21 | using System.Threading.Tasks; | |
22 | using Thrift.Protocol.Entities; | |
23 | using Thrift.Transport; | |
24 | ||
25 | namespace Thrift.Protocol | |
26 | { | |
27 | // ReSharper disable once InconsistentNaming | |
28 | public class TBinaryProtocol : TProtocol | |
29 | { | |
30 | protected const uint VersionMask = 0xffff0000; | |
31 | protected const uint Version1 = 0x80010000; | |
32 | ||
33 | protected bool StrictRead; | |
34 | protected bool StrictWrite; | |
35 | ||
36 | // minimize memory allocations by means of an preallocated bytes buffer | |
37 | // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long) | |
38 | private byte[] PreAllocatedBuffer = new byte[128]; | |
39 | ||
40 | private static readonly TStruct AnonymousStruct = new TStruct(string.Empty); | |
41 | private static readonly TField StopField = new TField() { Type = TType.Stop }; | |
42 | ||
43 | public TBinaryProtocol(TTransport trans) | |
44 | : this(trans, false, true) | |
45 | { | |
46 | } | |
47 | ||
48 | public TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite) | |
49 | : base(trans) | |
50 | { | |
51 | StrictRead = strictRead; | |
52 | StrictWrite = strictWrite; | |
53 | } | |
54 | ||
55 | public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) | |
56 | { | |
57 | if (cancellationToken.IsCancellationRequested) | |
58 | { | |
59 | return; | |
60 | } | |
61 | ||
62 | if (StrictWrite) | |
63 | { | |
64 | var version = Version1 | (uint) message.Type; | |
65 | await WriteI32Async((int) version, cancellationToken); | |
66 | await WriteStringAsync(message.Name, cancellationToken); | |
67 | await WriteI32Async(message.SeqID, cancellationToken); | |
68 | } | |
69 | else | |
70 | { | |
71 | await WriteStringAsync(message.Name, cancellationToken); | |
72 | await WriteByteAsync((sbyte) message.Type, cancellationToken); | |
73 | await WriteI32Async(message.SeqID, cancellationToken); | |
74 | } | |
75 | } | |
76 | ||
77 | public override async Task WriteMessageEndAsync(CancellationToken cancellationToken) | |
78 | { | |
79 | if (cancellationToken.IsCancellationRequested) | |
80 | { | |
81 | await Task.FromCanceled(cancellationToken); | |
82 | } | |
83 | } | |
84 | ||
85 | public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) | |
86 | { | |
87 | if (cancellationToken.IsCancellationRequested) | |
88 | { | |
89 | await Task.FromCanceled(cancellationToken); | |
90 | } | |
91 | } | |
92 | ||
93 | public override async Task WriteStructEndAsync(CancellationToken cancellationToken) | |
94 | { | |
95 | if (cancellationToken.IsCancellationRequested) | |
96 | { | |
97 | await Task.FromCanceled(cancellationToken); | |
98 | } | |
99 | } | |
100 | ||
101 | public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) | |
102 | { | |
103 | if (cancellationToken.IsCancellationRequested) | |
104 | { | |
105 | return; | |
106 | } | |
107 | ||
108 | await WriteByteAsync((sbyte) field.Type, cancellationToken); | |
109 | await WriteI16Async(field.ID, cancellationToken); | |
110 | } | |
111 | ||
112 | public override async Task WriteFieldEndAsync(CancellationToken cancellationToken) | |
113 | { | |
114 | if (cancellationToken.IsCancellationRequested) | |
115 | { | |
116 | await Task.FromCanceled(cancellationToken); | |
117 | } | |
118 | } | |
119 | ||
120 | public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) | |
121 | { | |
122 | if (cancellationToken.IsCancellationRequested) | |
123 | { | |
124 | return; | |
125 | } | |
126 | ||
127 | await WriteByteAsync((sbyte) TType.Stop, cancellationToken); | |
128 | } | |
129 | ||
130 | public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) | |
131 | { | |
132 | if (cancellationToken.IsCancellationRequested) | |
133 | { | |
134 | return; | |
135 | } | |
136 | ||
137 | PreAllocatedBuffer[0] = (byte)map.KeyType; | |
138 | PreAllocatedBuffer[1] = (byte)map.ValueType; | |
139 | await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); | |
140 | ||
141 | await WriteI32Async(map.Count, cancellationToken); | |
142 | } | |
143 | ||
144 | public override async Task WriteMapEndAsync(CancellationToken cancellationToken) | |
145 | ||
146 | { | |
147 | if (cancellationToken.IsCancellationRequested) | |
148 | { | |
149 | await Task.FromCanceled(cancellationToken); | |
150 | } | |
151 | } | |
152 | ||
153 | public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) | |
154 | { | |
155 | if (cancellationToken.IsCancellationRequested) | |
156 | { | |
157 | return; | |
158 | } | |
159 | ||
160 | await WriteByteAsync((sbyte) list.ElementType, cancellationToken); | |
161 | await WriteI32Async(list.Count, cancellationToken); | |
162 | } | |
163 | ||
164 | public override async Task WriteListEndAsync(CancellationToken cancellationToken) | |
165 | { | |
166 | if (cancellationToken.IsCancellationRequested) | |
167 | { | |
168 | await Task.FromCanceled(cancellationToken); | |
169 | } | |
170 | } | |
171 | ||
172 | public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) | |
173 | { | |
174 | if (cancellationToken.IsCancellationRequested) | |
175 | { | |
176 | return; | |
177 | } | |
178 | ||
179 | await WriteByteAsync((sbyte) set.ElementType, cancellationToken); | |
180 | await WriteI32Async(set.Count, cancellationToken); | |
181 | } | |
182 | ||
183 | public override async Task WriteSetEndAsync(CancellationToken cancellationToken) | |
184 | { | |
185 | if (cancellationToken.IsCancellationRequested) | |
186 | { | |
187 | await Task.FromCanceled(cancellationToken); | |
188 | } | |
189 | } | |
190 | ||
191 | public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) | |
192 | { | |
193 | if (cancellationToken.IsCancellationRequested) | |
194 | { | |
195 | return; | |
196 | } | |
197 | ||
198 | await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken); | |
199 | } | |
200 | ||
201 | public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) | |
202 | { | |
203 | if (cancellationToken.IsCancellationRequested) | |
204 | { | |
205 | return; | |
206 | } | |
207 | ||
208 | PreAllocatedBuffer[0] = (byte)b; | |
209 | ||
210 | await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); | |
211 | } | |
212 | public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) | |
213 | { | |
214 | if (cancellationToken.IsCancellationRequested) | |
215 | { | |
216 | return; | |
217 | } | |
218 | ||
219 | PreAllocatedBuffer[0] = (byte)(0xff & (i16 >> 8)); | |
220 | PreAllocatedBuffer[1] = (byte)(0xff & i16); | |
221 | ||
222 | await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); | |
223 | } | |
224 | ||
225 | public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) | |
226 | { | |
227 | if (cancellationToken.IsCancellationRequested) | |
228 | { | |
229 | return; | |
230 | } | |
231 | ||
232 | PreAllocatedBuffer[0] = (byte)(0xff & (i32 >> 24)); | |
233 | PreAllocatedBuffer[1] = (byte)(0xff & (i32 >> 16)); | |
234 | PreAllocatedBuffer[2] = (byte)(0xff & (i32 >> 8)); | |
235 | PreAllocatedBuffer[3] = (byte)(0xff & i32); | |
236 | ||
237 | await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken); | |
238 | } | |
239 | ||
240 | ||
241 | public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) | |
242 | { | |
243 | if (cancellationToken.IsCancellationRequested) | |
244 | { | |
245 | return; | |
246 | } | |
247 | ||
248 | PreAllocatedBuffer[0] = (byte)(0xff & (i64 >> 56)); | |
249 | PreAllocatedBuffer[1] = (byte)(0xff & (i64 >> 48)); | |
250 | PreAllocatedBuffer[2] = (byte)(0xff & (i64 >> 40)); | |
251 | PreAllocatedBuffer[3] = (byte)(0xff & (i64 >> 32)); | |
252 | PreAllocatedBuffer[4] = (byte)(0xff & (i64 >> 24)); | |
253 | PreAllocatedBuffer[5] = (byte)(0xff & (i64 >> 16)); | |
254 | PreAllocatedBuffer[6] = (byte)(0xff & (i64 >> 8)); | |
255 | PreAllocatedBuffer[7] = (byte)(0xff & i64); | |
256 | ||
257 | await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken); | |
258 | } | |
259 | ||
260 | public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) | |
261 | { | |
262 | if (cancellationToken.IsCancellationRequested) | |
263 | { | |
264 | return; | |
265 | } | |
266 | ||
267 | await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken); | |
268 | } | |
269 | ||
270 | ||
271 | public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) | |
272 | { | |
273 | if (cancellationToken.IsCancellationRequested) | |
274 | { | |
275 | return; | |
276 | } | |
277 | ||
278 | await WriteI32Async(bytes.Length, cancellationToken); | |
279 | await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); | |
280 | } | |
281 | ||
282 | public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) | |
283 | { | |
284 | if (cancellationToken.IsCancellationRequested) | |
285 | { | |
286 | return await Task.FromCanceled<TMessage>(cancellationToken); | |
287 | } | |
288 | ||
289 | var message = new TMessage(); | |
290 | var size = await ReadI32Async(cancellationToken); | |
291 | if (size < 0) | |
292 | { | |
293 | var version = (uint) size & VersionMask; | |
294 | if (version != Version1) | |
295 | { | |
296 | throw new TProtocolException(TProtocolException.BAD_VERSION, | |
297 | $"Bad version in ReadMessageBegin: {version}"); | |
298 | } | |
299 | message.Type = (TMessageType) (size & 0x000000ff); | |
300 | message.Name = await ReadStringAsync(cancellationToken); | |
301 | message.SeqID = await ReadI32Async(cancellationToken); | |
302 | } | |
303 | else | |
304 | { | |
305 | if (StrictRead) | |
306 | { | |
307 | throw new TProtocolException(TProtocolException.BAD_VERSION, | |
308 | "Missing version in ReadMessageBegin, old client?"); | |
309 | } | |
310 | message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty; | |
311 | message.Type = (TMessageType) await ReadByteAsync(cancellationToken); | |
312 | message.SeqID = await ReadI32Async(cancellationToken); | |
313 | } | |
314 | return message; | |
315 | } | |
316 | ||
317 | public override async Task ReadMessageEndAsync(CancellationToken cancellationToken) | |
318 | { | |
319 | if (cancellationToken.IsCancellationRequested) | |
320 | { | |
321 | await Task.FromCanceled(cancellationToken); | |
322 | } | |
323 | } | |
324 | ||
325 | public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) | |
326 | { | |
327 | if (cancellationToken.IsCancellationRequested) | |
328 | { | |
329 | await Task.FromCanceled(cancellationToken); | |
330 | } | |
331 | ||
332 | return AnonymousStruct; | |
333 | } | |
334 | ||
335 | public override async Task ReadStructEndAsync(CancellationToken cancellationToken) | |
336 | { | |
337 | if (cancellationToken.IsCancellationRequested) | |
338 | { | |
339 | await Task.FromCanceled(cancellationToken); | |
340 | } | |
341 | } | |
342 | ||
343 | public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) | |
344 | { | |
345 | if (cancellationToken.IsCancellationRequested) | |
346 | { | |
347 | return await Task.FromCanceled<TField>(cancellationToken); | |
348 | } | |
349 | ||
350 | ||
351 | var type = (TType)await ReadByteAsync(cancellationToken); | |
352 | if (type == TType.Stop) | |
353 | { | |
354 | return StopField; | |
355 | } | |
356 | ||
357 | return new TField { | |
358 | Type = type, | |
359 | ID = await ReadI16Async(cancellationToken) | |
360 | }; | |
361 | } | |
362 | ||
363 | public override async Task ReadFieldEndAsync(CancellationToken cancellationToken) | |
364 | { | |
365 | if (cancellationToken.IsCancellationRequested) | |
366 | { | |
367 | await Task.FromCanceled(cancellationToken); | |
368 | } | |
369 | } | |
370 | ||
371 | public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) | |
372 | { | |
373 | if (cancellationToken.IsCancellationRequested) | |
374 | { | |
375 | return await Task.FromCanceled<TMap>(cancellationToken); | |
376 | } | |
377 | ||
378 | var map = new TMap | |
379 | { | |
380 | KeyType = (TType) await ReadByteAsync(cancellationToken), | |
381 | ValueType = (TType) await ReadByteAsync(cancellationToken), | |
382 | Count = await ReadI32Async(cancellationToken) | |
383 | }; | |
384 | ||
385 | return map; | |
386 | } | |
387 | ||
388 | public override async Task ReadMapEndAsync(CancellationToken cancellationToken) | |
389 | { | |
390 | if (cancellationToken.IsCancellationRequested) | |
391 | { | |
392 | await Task.FromCanceled(cancellationToken); | |
393 | } | |
394 | } | |
395 | ||
396 | public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) | |
397 | { | |
398 | if (cancellationToken.IsCancellationRequested) | |
399 | { | |
400 | return await Task.FromCanceled<TList>(cancellationToken); | |
401 | } | |
402 | ||
403 | var list = new TList | |
404 | { | |
405 | ElementType = (TType) await ReadByteAsync(cancellationToken), | |
406 | Count = await ReadI32Async(cancellationToken) | |
407 | }; | |
408 | ||
409 | return list; | |
410 | } | |
411 | ||
412 | public override async Task ReadListEndAsync(CancellationToken cancellationToken) | |
413 | { | |
414 | if (cancellationToken.IsCancellationRequested) | |
415 | { | |
416 | await Task.FromCanceled(cancellationToken); | |
417 | } | |
418 | } | |
419 | ||
420 | public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) | |
421 | { | |
422 | if (cancellationToken.IsCancellationRequested) | |
423 | { | |
424 | return await Task.FromCanceled<TSet>(cancellationToken); | |
425 | } | |
426 | ||
427 | var set = new TSet | |
428 | { | |
429 | ElementType = (TType) await ReadByteAsync(cancellationToken), | |
430 | Count = await ReadI32Async(cancellationToken) | |
431 | }; | |
432 | ||
433 | return set; | |
434 | } | |
435 | ||
436 | public override async Task ReadSetEndAsync(CancellationToken cancellationToken) | |
437 | { | |
438 | if (cancellationToken.IsCancellationRequested) | |
439 | { | |
440 | await Task.FromCanceled(cancellationToken); | |
441 | } | |
442 | } | |
443 | ||
444 | public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) | |
445 | { | |
446 | if (cancellationToken.IsCancellationRequested) | |
447 | { | |
448 | return await Task.FromCanceled<bool>(cancellationToken); | |
449 | } | |
450 | ||
451 | return await ReadByteAsync(cancellationToken) == 1; | |
452 | } | |
453 | ||
454 | public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) | |
455 | { | |
456 | if (cancellationToken.IsCancellationRequested) | |
457 | { | |
458 | return await Task.FromCanceled<sbyte>(cancellationToken); | |
459 | } | |
460 | ||
461 | await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken); | |
462 | return (sbyte)PreAllocatedBuffer[0]; | |
463 | } | |
464 | ||
465 | public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) | |
466 | { | |
467 | if (cancellationToken.IsCancellationRequested) | |
468 | { | |
469 | return await Task.FromCanceled<short>(cancellationToken); | |
470 | } | |
471 | ||
472 | await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken); | |
473 | var result = (short) (((PreAllocatedBuffer[0] & 0xff) << 8) | PreAllocatedBuffer[1] & 0xff); | |
474 | return result; | |
475 | } | |
476 | ||
477 | public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) | |
478 | { | |
479 | if (cancellationToken.IsCancellationRequested) | |
480 | { | |
481 | return await Task.FromCanceled<int>(cancellationToken); | |
482 | } | |
483 | ||
484 | await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken); | |
485 | ||
486 | var result = | |
487 | ((PreAllocatedBuffer[0] & 0xff) << 24) | | |
488 | ((PreAllocatedBuffer[1] & 0xff) << 16) | | |
489 | ((PreAllocatedBuffer[2] & 0xff) << 8) | | |
490 | PreAllocatedBuffer[3] & 0xff; | |
491 | ||
492 | return result; | |
493 | } | |
494 | ||
495 | #pragma warning disable 675 | |
496 | ||
497 | protected internal long ReadI64FromPreAllocatedBuffer() | |
498 | { | |
499 | var result = | |
500 | ((long) (PreAllocatedBuffer[0] & 0xff) << 56) | | |
501 | ((long) (PreAllocatedBuffer[1] & 0xff) << 48) | | |
502 | ((long) (PreAllocatedBuffer[2] & 0xff) << 40) | | |
503 | ((long) (PreAllocatedBuffer[3] & 0xff) << 32) | | |
504 | ((long) (PreAllocatedBuffer[4] & 0xff) << 24) | | |
505 | ((long) (PreAllocatedBuffer[5] & 0xff) << 16) | | |
506 | ((long) (PreAllocatedBuffer[6] & 0xff) << 8) | | |
507 | PreAllocatedBuffer[7] & 0xff; | |
508 | ||
509 | return result; | |
510 | } | |
511 | ||
512 | #pragma warning restore 675 | |
513 | ||
514 | public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) | |
515 | { | |
516 | if (cancellationToken.IsCancellationRequested) | |
517 | { | |
518 | return await Task.FromCanceled<long>(cancellationToken); | |
519 | } | |
520 | ||
521 | await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken); | |
522 | return ReadI64FromPreAllocatedBuffer(); | |
523 | } | |
524 | ||
525 | public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) | |
526 | { | |
527 | if (cancellationToken.IsCancellationRequested) | |
528 | { | |
529 | return await Task.FromCanceled<double>(cancellationToken); | |
530 | } | |
531 | ||
532 | var d = await ReadI64Async(cancellationToken); | |
533 | return BitConverter.Int64BitsToDouble(d); | |
534 | } | |
535 | ||
536 | public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) | |
537 | { | |
538 | if (cancellationToken.IsCancellationRequested) | |
539 | { | |
540 | return await Task.FromCanceled<byte[]>(cancellationToken); | |
541 | } | |
542 | ||
543 | var size = await ReadI32Async(cancellationToken); | |
544 | var buf = new byte[size]; | |
545 | await Trans.ReadAllAsync(buf, 0, size, cancellationToken); | |
546 | return buf; | |
547 | } | |
548 | ||
549 | public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) | |
550 | { | |
551 | if (cancellationToken.IsCancellationRequested) | |
552 | { | |
553 | return await Task.FromCanceled<string>(cancellationToken); | |
554 | } | |
555 | ||
556 | var size = await ReadI32Async(cancellationToken); | |
557 | return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty; | |
558 | } | |
559 | ||
560 | private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken) | |
561 | { | |
562 | if (cancellationToken.IsCancellationRequested) | |
563 | { | |
564 | await Task.FromCanceled<string>(cancellationToken); | |
565 | } | |
566 | ||
567 | if (size <= PreAllocatedBuffer.Length) | |
568 | { | |
569 | await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken); | |
570 | return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size); | |
571 | } | |
572 | ||
573 | var buf = new byte[size]; | |
574 | await Trans.ReadAllAsync(buf, 0, size, cancellationToken); | |
575 | return Encoding.UTF8.GetString(buf, 0, buf.Length); | |
576 | } | |
577 | ||
578 | public class Factory : TProtocolFactory | |
579 | { | |
580 | protected bool StrictRead; | |
581 | protected bool StrictWrite; | |
582 | ||
583 | public Factory() | |
584 | : this(false, true) | |
585 | { | |
586 | } | |
587 | ||
588 | public Factory(bool strictRead, bool strictWrite) | |
589 | { | |
590 | StrictRead = strictRead; | |
591 | StrictWrite = strictWrite; | |
592 | } | |
593 | ||
594 | public override TProtocol GetProtocol(TTransport trans) | |
595 | { | |
596 | return new TBinaryProtocol(trans, StrictRead, StrictWrite); | |
597 | } | |
598 | } | |
599 | } | |
600 | } |