]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | #include <tr1/functional> |
2 | #include <thrift/protocol/TBinaryProtocol.h> | |
3 | #include <thrift/async/TAsyncProtocolProcessor.h> | |
4 | #include <thrift/async/TEvhttpServer.h> | |
5 | #include <thrift/async/TEvhttpClientChannel.h> | |
6 | #include "Aggr.h" | |
7 | ||
8 | using std::tr1::bind; | |
9 | using std::tr1::placeholders::_1; | |
10 | ||
11 | using apache::thrift::TException; | |
12 | using apache::thrift::protocol::TBinaryProtocolFactory; | |
13 | using apache::thrift::protocol::TProtocolFactory; | |
14 | using apache::thrift::async::TEvhttpServer; | |
15 | using apache::thrift::async::TAsyncProcessor; | |
16 | using apache::thrift::async::TAsyncBufferProcessor; | |
17 | using apache::thrift::async::TAsyncProtocolProcessor; | |
18 | using apache::thrift::async::TAsyncChannel; | |
19 | using apache::thrift::async::TEvhttpClientChannel; | |
20 | ||
21 | class AggrAsyncHandler : public AggrCobSvIf { | |
22 | protected: | |
23 | struct RequestContext { | |
24 | std::tr1::function<void(std::vector<int32_t> const& _return)> cob; | |
25 | std::vector<int32_t> ret; | |
26 | int pending_calls; | |
27 | }; | |
28 | ||
29 | public: | |
30 | AggrAsyncHandler() | |
31 | : eb_(NULL) | |
32 | , pfact_(new TBinaryProtocolFactory()) | |
33 | { | |
34 | leaf_ports_.push_back(8081); | |
35 | leaf_ports_.push_back(8082); | |
36 | } | |
37 | ||
38 | void addValue(std::tr1::function<void()> cob, const int32_t value) { | |
39 | // Silently drop writes to the aggrgator. | |
40 | return cob(); | |
41 | } | |
42 | ||
43 | void getValues(std::tr1::function<void( | |
44 | std::vector<int32_t> const& _return)> cob, | |
45 | std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) { | |
46 | RequestContext* ctx = new RequestContext(); | |
47 | ctx->cob = cob; | |
48 | ctx->pending_calls = leaf_ports_.size(); | |
49 | for (std::vector<int>::iterator it = leaf_ports_.begin(); | |
50 | it != leaf_ports_.end(); ++it) { | |
51 | boost::shared_ptr<TAsyncChannel> channel( | |
52 | new TEvhttpClientChannel( | |
53 | "localhost", "/", "127.0.0.1", *it, eb_)); | |
54 | AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); | |
55 | client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); | |
56 | } | |
57 | } | |
58 | ||
59 | void setEventBase(struct event_base* eb) { | |
60 | eb_ = eb; | |
61 | } | |
62 | ||
63 | void clientReturn(RequestContext* ctx, AggrCobClient* client) { | |
64 | ctx->pending_calls -= 1; | |
65 | ||
66 | try { | |
67 | std::vector<int32_t> subret; | |
68 | client->recv_getValues(subret); | |
69 | ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); | |
70 | } catch (TException& exn) { | |
71 | // TODO: Log error | |
72 | } | |
73 | ||
74 | delete client; | |
75 | ||
76 | if (ctx->pending_calls == 0) { | |
77 | ctx->cob(ctx->ret); | |
78 | delete ctx; | |
79 | } | |
80 | } | |
81 | ||
82 | protected: | |
83 | struct event_base* eb_; | |
84 | std::vector<int> leaf_ports_; | |
85 | boost::shared_ptr<TProtocolFactory> pfact_; | |
86 | }; | |
87 | ||
88 | ||
89 | int main() { | |
90 | boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler()); | |
91 | boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler)); | |
92 | boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory()); | |
93 | boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact)); | |
94 | boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080)); | |
95 | handler->setEventBase(server->getEventBase()); | |
96 | server->serve(); | |
97 | } |