]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | using System; |
2 | using ZMQ; | |
3 | using System.IO; | |
4 | using Thrift.Transport; | |
5 | ||
6 | namespace ZmqClient | |
7 | { | |
8 | public class TZmqClient : TTransport | |
9 | { | |
10 | Socket _sock; | |
11 | String _endpoint; | |
12 | MemoryStream _wbuf = new MemoryStream (); | |
13 | MemoryStream _rbuf = new MemoryStream (); | |
14 | ||
15 | void debug (string msg) | |
16 | { | |
17 | //Uncomment to enable debug | |
18 | // Console.WriteLine (msg); | |
19 | } | |
20 | ||
21 | public TZmqClient (Context ctx, String endpoint, SocketType sockType) | |
22 | { | |
23 | _sock = ctx.Socket (sockType); | |
24 | _endpoint = endpoint; | |
25 | } | |
26 | ||
27 | public override void Open () | |
28 | { | |
29 | _sock.Connect (_endpoint); | |
30 | } | |
31 | ||
32 | public override void Close () | |
33 | { | |
34 | throw new NotImplementedException (); | |
35 | } | |
36 | ||
37 | public override bool IsOpen { | |
38 | get { | |
39 | throw new NotImplementedException (); | |
40 | } | |
41 | } | |
42 | ||
43 | public override int Read (byte[] buf, int off, int len) | |
44 | { | |
45 | debug ("Client_Read"); | |
46 | if (off != 0 || len != buf.Length) | |
47 | throw new NotImplementedException (); | |
48 | ||
49 | if (_rbuf.Length == 0) { | |
50 | //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response | |
51 | debug ("Client_Read Filling buffer.."); | |
52 | byte[] tmpBuf = _sock.Recv (); | |
53 | debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length)); | |
54 | _rbuf.Write (tmpBuf, 0, tmpBuf.Length); | |
55 | _rbuf.Position = 0; //For reading | |
56 | } | |
57 | int ret = _rbuf.Read (buf, 0, len); | |
58 | if (_rbuf.Length == _rbuf.Position) //Finished reading | |
59 | _rbuf.SetLength (0); | |
60 | debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position)); | |
61 | return ret; | |
62 | } | |
63 | ||
64 | public override void Write (byte[] buf, int off, int len) | |
65 | { | |
66 | debug ("Client_Write"); | |
67 | _wbuf.Write (buf, off, len); | |
68 | } | |
69 | ||
70 | public override void Flush () | |
71 | { | |
72 | debug ("Client_Flush"); | |
73 | _sock.Send (_wbuf.GetBuffer ()); | |
74 | _wbuf = new MemoryStream (); | |
75 | } | |
76 | } | |
77 | } | |
78 |