4 using Thrift.Transport;
8 public class TZmqClient : TTransport
12 MemoryStream _wbuf = new MemoryStream ();
13 MemoryStream _rbuf = new MemoryStream ();
15 void debug (string msg)
17 //Uncomment to enable debug
18 // Console.WriteLine (msg);
21 public TZmqClient (Context ctx, String endpoint, SocketType sockType)
23 _sock = ctx.Socket (sockType);
27 public override void Open ()
29 _sock.Connect (_endpoint);
32 public override void Close ()
34 throw new NotImplementedException ();
37 public override bool IsOpen {
39 throw new NotImplementedException ();
43 public override int Read (byte[] buf, int off, int len)
45 debug ("Client_Read");
46 if (off != 0 || len != buf.Length)
47 throw new NotImplementedException ();
49 if (_rbuf.Length == 0) {
50 //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
51 debug ("Client_Read Filling buffer..");
52 byte[] tmpBuf = _sock.Recv ();
53 debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
54 _rbuf.Write (tmpBuf, 0, tmpBuf.Length);
55 _rbuf.Position = 0; //For reading
57 int ret = _rbuf.Read (buf, 0, len);
58 if (_rbuf.Length == _rbuf.Position) //Finished reading
60 debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
64 public override void Write (byte[] buf, int off, int len)
66 debug ("Client_Write");
67 _wbuf.Write (buf, off, len);
70 public override void Flush ()
72 debug ("Client_Flush");
73 _sock.Send (_wbuf.GetBuffer ());
74 _wbuf = new MemoryStream ();