1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
19 using System.Collections.Generic;
21 using System.Threading;
22 using System.Threading.Tasks;
23 using Thrift.Protocols.Entities;
24 using Thrift.Transports;
26 namespace Thrift.Protocols
28 //TODO: implementation of TProtocol
30 // ReSharper disable once InconsistentNaming
31 public class TCompactProtocol : TProtocol
33 private const byte ProtocolId = 0x82;
34 private const byte Version = 1;
35 private const byte VersionMask = 0x1f; // 0001 1111
36 private const byte TypeMask = 0xE0; // 1110 0000
37 private const byte TypeBits = 0x07; // 0000 0111
38 private const int TypeShiftAmount = 5;
39 private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
40 private static readonly TField Tstop = new TField(string.Empty, TType.Stop, 0);
42 // ReSharper disable once InconsistentNaming
43 private static readonly byte[] TTypeToCompactType = new byte[16];
46 /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
48 private readonly Stack<short> _lastField = new Stack<short>(15);
51 /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated.
53 private TField? _booleanField;
56 /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it.
58 private bool? _boolValue;
60 private short _lastFieldId;
62 public TCompactProtocol(TClientTransport trans)
65 TTypeToCompactType[(int) TType.Stop] = Types.Stop;
66 TTypeToCompactType[(int) TType.Bool] = Types.BooleanTrue;
67 TTypeToCompactType[(int) TType.Byte] = Types.Byte;
68 TTypeToCompactType[(int) TType.I16] = Types.I16;
69 TTypeToCompactType[(int) TType.I32] = Types.I32;
70 TTypeToCompactType[(int) TType.I64] = Types.I64;
71 TTypeToCompactType[(int) TType.Double] = Types.Double;
72 TTypeToCompactType[(int) TType.String] = Types.Binary;
73 TTypeToCompactType[(int) TType.List] = Types.List;
74 TTypeToCompactType[(int) TType.Set] = Types.Set;
75 TTypeToCompactType[(int) TType.Map] = Types.Map;
76 TTypeToCompactType[(int) TType.Struct] = Types.Struct;
85 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
87 if (cancellationToken.IsCancellationRequested)
92 await Trans.WriteAsync(new[] {ProtocolId}, cancellationToken);
95 new[] {(byte) ((Version & VersionMask) | (((uint) message.Type << TypeShiftAmount) & TypeMask))},
98 var bufferTuple = CreateWriteVarInt32((uint) message.SeqID);
99 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
101 await WriteStringAsync(message.Name, cancellationToken);
104 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
106 if (cancellationToken.IsCancellationRequested)
108 await Task.FromCanceled(cancellationToken);
113 /// Write a struct begin. This doesn't actually put anything on the wire. We
114 /// use it as an opportunity to put special placeholder markers on the field
115 /// stack so we can get the field id deltas correct.
117 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
119 if (cancellationToken.IsCancellationRequested)
121 await Task.FromCanceled(cancellationToken);
124 _lastField.Push(_lastFieldId);
128 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
130 if (cancellationToken.IsCancellationRequested)
132 await Task.FromCanceled(cancellationToken);
135 _lastFieldId = _lastField.Pop();
138 private async Task WriteFieldBeginInternalAsync(TField field, byte typeOverride,
139 CancellationToken cancellationToken)
141 // if there's a exType override, use that.
142 var typeToWrite = typeOverride == 0xFF ? GetCompactType(field.Type) : typeOverride;
144 // check if we can use delta encoding for the field id
145 if ((field.ID > _lastFieldId) && (field.ID - _lastFieldId <= 15))
147 var b = (byte) (((field.ID - _lastFieldId) << 4) | typeToWrite);
148 // Write them together
149 await Trans.WriteAsync(new[] {b}, cancellationToken);
153 // Write them separate
154 await Trans.WriteAsync(new[] {typeToWrite}, cancellationToken);
155 await WriteI16Async(field.ID, cancellationToken);
158 _lastFieldId = field.ID;
161 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
163 if (field.Type == TType.Bool)
165 _booleanField = field;
169 await WriteFieldBeginInternalAsync(field, 0xFF, cancellationToken);
173 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
175 if (cancellationToken.IsCancellationRequested)
177 await Task.FromCanceled(cancellationToken);
181 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
183 if (cancellationToken.IsCancellationRequested)
188 await Trans.WriteAsync(new[] {Types.Stop}, cancellationToken);
191 protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
193 if (cancellationToken.IsCancellationRequested)
199 Abstract method for writing the start of lists and sets. List and sets on
200 the wire differ only by the exType indicator.
205 await Trans.WriteAsync(new[] {(byte) ((size << 4) | GetCompactType(elemType))}, cancellationToken);
209 await Trans.WriteAsync(new[] {(byte) (0xf0 | GetCompactType(elemType))}, cancellationToken);
211 var bufferTuple = CreateWriteVarInt32((uint) size);
212 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
216 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
218 await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken);
221 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
223 if (cancellationToken.IsCancellationRequested)
225 await Task.FromCanceled(cancellationToken);
229 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
231 if (cancellationToken.IsCancellationRequested)
236 await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken);
239 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
241 if (cancellationToken.IsCancellationRequested)
243 await Task.FromCanceled(cancellationToken);
247 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
249 if (cancellationToken.IsCancellationRequested)
255 Write a boolean value. Potentially, this could be a boolean field, in
256 which case the field header info isn't written yet. If so, decide what the
257 right exType header is for the value and then Write the field header.
258 Otherwise, Write a single byte.
261 if (_booleanField != null)
263 // we haven't written the field header yet
265 WriteFieldBeginInternalAsync(_booleanField.Value, b ? Types.BooleanTrue : Types.BooleanFalse,
267 _booleanField = null;
271 // we're not part of a field, so just Write the value.
272 await Trans.WriteAsync(new[] {b ? Types.BooleanTrue : Types.BooleanFalse}, cancellationToken);
276 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
278 if (cancellationToken.IsCancellationRequested)
283 await Trans.WriteAsync(new[] {(byte) b}, cancellationToken);
286 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
288 if (cancellationToken.IsCancellationRequested)
293 var bufferTuple = CreateWriteVarInt32(IntToZigzag(i16));
294 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
297 protected internal Tuple<byte[], int> CreateWriteVarInt32(uint n)
299 // Write an i32 as a varint.Results in 1 - 5 bytes on the wire.
300 var i32Buf = new byte[5];
305 if ((n & ~0x7F) == 0)
307 i32Buf[idx++] = (byte) n;
311 i32Buf[idx++] = (byte) ((n & 0x7F) | 0x80);
315 return new Tuple<byte[], int>(i32Buf, idx);
318 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
320 if (cancellationToken.IsCancellationRequested)
325 var bufferTuple = CreateWriteVarInt32(IntToZigzag(i32));
326 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
329 protected internal Tuple<byte[], int> CreateWriteVarInt64(ulong n)
331 // Write an i64 as a varint. Results in 1-10 bytes on the wire.
332 var buf = new byte[10];
337 if ((n & ~(ulong) 0x7FL) == 0)
339 buf[idx++] = (byte) n;
342 buf[idx++] = (byte) ((n & 0x7F) | 0x80);
346 return new Tuple<byte[], int>(buf, idx);
349 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
351 if (cancellationToken.IsCancellationRequested)
356 var bufferTuple = CreateWriteVarInt64(LongToZigzag(i64));
357 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
360 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
362 if (cancellationToken.IsCancellationRequested)
367 var data = new byte[8];
368 FixedLongToBytes(BitConverter.DoubleToInt64Bits(d), data, 0);
369 await Trans.WriteAsync(data, cancellationToken);
372 public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
374 if (cancellationToken.IsCancellationRequested)
379 var bytes = Encoding.UTF8.GetBytes(str);
381 var bufferTuple = CreateWriteVarInt32((uint) bytes.Length);
382 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
383 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
386 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
388 if (cancellationToken.IsCancellationRequested)
393 var bufferTuple = CreateWriteVarInt32((uint) bytes.Length);
394 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
395 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
398 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
400 if (cancellationToken.IsCancellationRequested)
407 await Trans.WriteAsync(new[] {(byte) 0}, cancellationToken);
411 var bufferTuple = CreateWriteVarInt32((uint) map.Count);
412 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
415 new[] {(byte) ((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType))},
420 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
422 if (cancellationToken.IsCancellationRequested)
424 await Task.FromCanceled(cancellationToken);
428 public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
430 if (cancellationToken.IsCancellationRequested)
432 return await Task.FromCanceled<TMessage>(cancellationToken);
435 var protocolId = (byte) await ReadByteAsync(cancellationToken);
436 if (protocolId != ProtocolId)
438 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}");
441 var versionAndType = (byte) await ReadByteAsync(cancellationToken);
442 var version = (byte) (versionAndType & VersionMask);
444 if (version != Version)
446 throw new TProtocolException($"Expected version {Version} but got {version}");
449 var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits);
450 var seqid = (int) await ReadVarInt32Async(cancellationToken);
451 var messageName = await ReadStringAsync(cancellationToken);
453 return new TMessage(messageName, (TMessageType) type, seqid);
456 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
458 if (cancellationToken.IsCancellationRequested)
460 await Task.FromCanceled(cancellationToken);
464 public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
466 if (cancellationToken.IsCancellationRequested)
468 return await Task.FromCanceled<TStruct>(cancellationToken);
471 // some magic is here )
473 _lastField.Push(_lastFieldId);
476 return AnonymousStruct;
479 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
481 if (cancellationToken.IsCancellationRequested)
483 await Task.FromCanceled(cancellationToken);
487 Doesn't actually consume any wire data, just removes the last field for
488 this struct from the field stack.
491 // consume the last field we Read off the wire.
492 _lastFieldId = _lastField.Pop();
495 public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
497 // Read a field header off the wire.
498 var type = (byte) await ReadByteAsync(cancellationToken);
499 // if it's a stop, then we can return immediately, as the struct is over.
500 if (type == Types.Stop)
506 // mask off the 4 MSB of the exType header. it could contain a field id delta.
507 var modifier = (short) ((type & 0xf0) >> 4);
510 fieldId = await ReadI16Async(cancellationToken);
514 fieldId = (short) (_lastFieldId + modifier);
517 var field = new TField(string.Empty, GetTType((byte) (type & 0x0f)), fieldId);
518 // if this happens to be a boolean field, the value is encoded in the exType
519 if (IsBoolType(type))
521 _boolValue = (byte) (type & 0x0f) == Types.BooleanTrue;
524 // push the new field onto the field stack so we can keep the deltas going.
525 _lastFieldId = field.ID;
529 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
531 if (cancellationToken.IsCancellationRequested)
533 await Task.FromCanceled(cancellationToken);
537 public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
539 if (cancellationToken.IsCancellationRequested)
541 await Task.FromCanceled<TMap>(cancellationToken);
545 Read a map header off the wire. If the size is zero, skip Reading the key
546 and value exType. This means that 0-length maps will yield TMaps without the
550 var size = (int) await ReadVarInt32Async(cancellationToken);
551 var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken);
552 return new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size);
555 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
557 if (cancellationToken.IsCancellationRequested)
559 await Task.FromCanceled(cancellationToken);
563 public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
566 Read a set header off the wire. If the set size is 0-14, the size will
567 be packed into the element exType header. If it's a longer set, the 4 MSB
568 of the element exType header will be 0xF, and a varint will follow with the
572 return new TSet(await ReadListBeginAsync(cancellationToken));
575 public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
577 if (cancellationToken.IsCancellationRequested)
579 return await Task.FromCanceled<bool>(cancellationToken);
583 Read a boolean off the wire. If this is a boolean field, the value should
584 already have been Read during ReadFieldBegin, so we'll just consume the
585 pre-stored value. Otherwise, Read a byte.
588 if (_boolValue != null)
590 var result = _boolValue.Value;
595 return await ReadByteAsync(cancellationToken) == Types.BooleanTrue;
598 public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
600 if (cancellationToken.IsCancellationRequested)
602 return await Task.FromCanceled<sbyte>(cancellationToken);
605 // Read a single byte off the wire. Nothing interesting here.
606 var buf = new byte[1];
607 await Trans.ReadAllAsync(buf, 0, 1, cancellationToken);
608 return (sbyte) buf[0];
611 public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
613 if (cancellationToken.IsCancellationRequested)
615 return await Task.FromCanceled<short>(cancellationToken);
618 return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
621 public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
623 if (cancellationToken.IsCancellationRequested)
625 return await Task.FromCanceled<int>(cancellationToken);
628 return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
631 public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
633 if (cancellationToken.IsCancellationRequested)
635 return await Task.FromCanceled<long>(cancellationToken);
638 return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
641 public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
643 if (cancellationToken.IsCancellationRequested)
645 return await Task.FromCanceled<double>(cancellationToken);
648 var longBits = new byte[8];
649 await Trans.ReadAllAsync(longBits, 0, 8, cancellationToken);
651 return BitConverter.Int64BitsToDouble(BytesToLong(longBits));
654 public override async Task<string> ReadStringAsync(CancellationToken cancellationToken)
656 if (cancellationToken.IsCancellationRequested)
658 await Task.FromCanceled<string>(cancellationToken);
661 // Reads a byte[] (via ReadBinary), and then UTF-8 decodes it.
662 var length = (int) await ReadVarInt32Async(cancellationToken);
669 var buf = new byte[length];
670 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
672 return Encoding.UTF8.GetString(buf);
675 public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
677 if (cancellationToken.IsCancellationRequested)
679 return await Task.FromCanceled<byte[]>(cancellationToken);
682 // Read a byte[] from the wire.
683 var length = (int) await ReadVarInt32Async(cancellationToken);
689 var buf = new byte[length];
690 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
694 public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
696 if (cancellationToken.IsCancellationRequested)
698 await Task.FromCanceled<TList>(cancellationToken);
702 Read a list header off the wire. If the list size is 0-14, the size will
703 be packed into the element exType header. If it's a longer list, the 4 MSB
704 of the element exType header will be 0xF, and a varint will follow with the
708 var sizeAndType = (byte) await ReadByteAsync(cancellationToken);
709 var size = (sizeAndType >> 4) & 0x0f;
712 size = (int) await ReadVarInt32Async(cancellationToken);
715 var type = GetTType(sizeAndType);
716 return new TList(type, size);
719 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
721 if (cancellationToken.IsCancellationRequested)
723 await Task.FromCanceled(cancellationToken);
727 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
729 if (cancellationToken.IsCancellationRequested)
731 await Task.FromCanceled(cancellationToken);
735 private static byte GetCompactType(TType ttype)
737 // Given a TType value, find the appropriate TCompactProtocol.Types constant.
738 return TTypeToCompactType[(int) ttype];
742 private async Task<uint> ReadVarInt32Async(CancellationToken cancellationToken)
744 if (cancellationToken.IsCancellationRequested)
746 return await Task.FromCanceled<uint>(cancellationToken);
750 Read an i32 from the wire as a varint. The MSB of each byte is set
751 if there is another byte to follow. This can Read up to 5 bytes.
759 var b = (byte) await ReadByteAsync(cancellationToken);
760 result |= (uint) (b & 0x7f) << shift;
761 if ((b & 0x80) != 0x80)
771 private async Task<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
773 if (cancellationToken.IsCancellationRequested)
775 return await Task.FromCanceled<uint>(cancellationToken);
779 Read an i64 from the wire as a proper varint. The MSB of each byte is set
780 if there is another byte to follow. This can Read up to 10 bytes.
787 var b = (byte) await ReadByteAsync(cancellationToken);
788 result |= (ulong) (b & 0x7f) << shift;
789 if ((b & 0x80) != 0x80)
799 private static int ZigzagToInt(uint n)
801 return (int) (n >> 1) ^ -(int) (n & 1);
804 private static long ZigzagToLong(ulong n)
806 return (long) (n >> 1) ^ -(long) (n & 1);
809 private static long BytesToLong(byte[] bytes)
812 Note that it's important that the mask bytes are long literals,
813 otherwise they'll default to ints, and when you shift an int left 56 bits,
814 you just get a messed up int.
818 ((bytes[7] & 0xffL) << 56) |
819 ((bytes[6] & 0xffL) << 48) |
820 ((bytes[5] & 0xffL) << 40) |
821 ((bytes[4] & 0xffL) << 32) |
822 ((bytes[3] & 0xffL) << 24) |
823 ((bytes[2] & 0xffL) << 16) |
824 ((bytes[1] & 0xffL) << 8) |
828 private static bool IsBoolType(byte b)
830 var lowerNibble = b & 0x0f;
831 return (lowerNibble == Types.BooleanTrue) || (lowerNibble == Types.BooleanFalse);
834 private static TType GetTType(byte type)
836 // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
837 switch ((byte) (type & 0x0f))
841 case Types.BooleanFalse:
842 case Types.BooleanTrue:
865 throw new TProtocolException($"Don't know what exType: {(byte) (type & 0x0f)}");
869 private static ulong LongToZigzag(long n)
871 // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint
872 return (ulong) (n << 1) ^ (ulong) (n >> 63);
875 private static uint IntToZigzag(int n)
877 // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint
878 return (uint) (n << 1) ^ (uint) (n >> 31);
881 private static void FixedLongToBytes(long n, byte[] buf, int off)
883 // Convert a long into little-endian bytes in buf starting at off and going until off+7.
884 buf[off + 0] = (byte) (n & 0xff);
885 buf[off + 1] = (byte) ((n >> 8) & 0xff);
886 buf[off + 2] = (byte) ((n >> 16) & 0xff);
887 buf[off + 3] = (byte) ((n >> 24) & 0xff);
888 buf[off + 4] = (byte) ((n >> 32) & 0xff);
889 buf[off + 5] = (byte) ((n >> 40) & 0xff);
890 buf[off + 6] = (byte) ((n >> 48) & 0xff);
891 buf[off + 7] = (byte) ((n >> 56) & 0xff);
894 public class Factory : ITProtocolFactory
896 public TProtocol GetProtocol(TClientTransport trans)
898 return new TCompactProtocol(trans);
903 /// All of the on-wire exType codes.
905 private static class Types
907 public const byte Stop = 0x00;
908 public const byte BooleanTrue = 0x01;
909 public const byte BooleanFalse = 0x02;
910 public const byte Byte = 0x03;
911 public const byte I16 = 0x04;
912 public const byte I32 = 0x05;
913 public const byte I64 = 0x06;
914 public const byte Double = 0x07;
915 public const byte Binary = 0x08;
916 public const byte List = 0x09;
917 public const byte Set = 0x0A;
918 public const byte Map = 0x0B;
919 public const byte Struct = 0x0C;