]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright Oliver Kowalke 2015. |
2 | // Distributed under the Boost Software License, Version 1.0. | |
3 | // (See accompanying file LICENSE_1_0.txt or copy at | |
4 | // http://www.boost.org/LICENSE_1_0.txt) | |
5 | ||
6 | #include <cstddef> | |
7 | #include <cstdlib> | |
8 | #include <map> | |
b32b8144 | 9 | #include <memory> |
7c673cae FG |
10 | #include <set> |
11 | #include <iostream> | |
12 | #include <string> | |
13 | ||
14 | #include <boost/asio.hpp> | |
7c673cae FG |
15 | #include <boost/utility.hpp> |
16 | ||
17 | #include <boost/fiber/all.hpp> | |
18 | #include "../round_robin.hpp" | |
19 | #include "../yield.hpp" | |
20 | ||
21 | using boost::asio::ip::tcp; | |
22 | ||
23 | const std::size_t max_length = 1024; | |
24 | ||
25 | class subscriber_session; | |
b32b8144 | 26 | typedef std::shared_ptr< subscriber_session > subscriber_session_ptr; |
7c673cae | 27 | |
b32b8144 FG |
28 | // a queue has n subscribers (subscriptions) |
29 | // this class holds a list of subcribers for one queue | |
7c673cae FG |
30 | class subscriptions { |
31 | public: | |
32 | ~subscriptions(); | |
33 | ||
b32b8144 | 34 | // subscribe to this queue |
7c673cae FG |
35 | void subscribe( subscriber_session_ptr const& s) { |
36 | subscribers_.insert( s); | |
37 | } | |
38 | ||
b32b8144 | 39 | // unsubscribe from this queue |
7c673cae FG |
40 | void unsubscribe( subscriber_session_ptr const& s) { |
41 | subscribers_.erase(s); | |
42 | } | |
43 | ||
44 | // publish a message, e.g. push this message to all subscribers | |
45 | void publish( std::string const& msg); | |
46 | ||
47 | private: | |
48 | // list of subscribers | |
49 | std::set< subscriber_session_ptr > subscribers_; | |
50 | }; | |
51 | ||
b32b8144 | 52 | // a class to register queues and to subsribe clients to this queues |
7c673cae FG |
53 | class registry : private boost::noncopyable { |
54 | private: | |
b32b8144 FG |
55 | typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont; |
56 | typedef queues_cont::iterator queues_iter; | |
7c673cae FG |
57 | |
58 | boost::fibers::mutex mtx_; | |
b32b8144 | 59 | queues_cont queues_; |
7c673cae | 60 | |
b32b8144 FG |
61 | void register_queue_( std::string const& queue) { |
62 | if ( queues_.end() != queues_.find( queue) ) { | |
63 | throw std::runtime_error("queue already exists"); | |
7c673cae | 64 | } |
b32b8144 FG |
65 | queues_[queue] = std::make_shared< subscriptions >(); |
66 | std::cout << "new queue '" << queue << "' registered" << std::endl; | |
7c673cae FG |
67 | } |
68 | ||
b32b8144 FG |
69 | void unregister_queue_( std::string const& queue) { |
70 | queues_.erase( queue); | |
71 | std::cout << "queue '" << queue << "' unregistered" << std::endl; | |
7c673cae FG |
72 | } |
73 | ||
b32b8144 FG |
74 | void subscribe_( std::string const& queue, subscriber_session_ptr s) { |
75 | queues_iter iter = queues_.find( queue); | |
76 | if ( queues_.end() == iter ) { | |
77 | throw std::runtime_error("queue does not exist"); | |
7c673cae FG |
78 | } |
79 | iter->second->subscribe( s); | |
b32b8144 | 80 | std::cout << "new subscription to queue '" << queue << "'" << std::endl; |
7c673cae FG |
81 | } |
82 | ||
b32b8144 FG |
83 | void unsubscribe_( std::string const& queue, subscriber_session_ptr s) { |
84 | queues_iter iter = queues_.find( queue); | |
85 | if ( queues_.end() != iter ) { | |
7c673cae FG |
86 | iter->second->unsubscribe( s); |
87 | } | |
88 | } | |
89 | ||
b32b8144 FG |
90 | void publish_( std::string const& queue, std::string const& msg) { |
91 | queues_iter iter = queues_.find( queue); | |
92 | if ( queues_.end() == iter ) { | |
93 | throw std::runtime_error("queue does not exist"); | |
7c673cae FG |
94 | } |
95 | iter->second->publish( msg); | |
b32b8144 | 96 | std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl; |
7c673cae FG |
97 | } |
98 | ||
99 | public: | |
b32b8144 FG |
100 | // add a queue to registry |
101 | void register_queue( std::string const& queue) { | |
7c673cae | 102 | std::unique_lock< boost::fibers::mutex > lk( mtx_); |
b32b8144 | 103 | register_queue_( queue); |
7c673cae FG |
104 | } |
105 | ||
b32b8144 FG |
106 | // remove a queue from registry |
107 | void unregister_queue( std::string const& queue) { | |
7c673cae | 108 | std::unique_lock< boost::fibers::mutex > lk( mtx_); |
b32b8144 | 109 | unregister_queue_( queue); |
7c673cae FG |
110 | } |
111 | ||
b32b8144 FG |
112 | // subscribe to a queue |
113 | void subscribe( std::string const& queue, subscriber_session_ptr s) { | |
7c673cae | 114 | std::unique_lock< boost::fibers::mutex > lk( mtx_); |
b32b8144 | 115 | subscribe_( queue, s); |
7c673cae FG |
116 | } |
117 | ||
b32b8144 FG |
118 | // unsubscribe from a queue |
119 | void unsubscribe( std::string const& queue, subscriber_session_ptr s) { | |
7c673cae | 120 | std::unique_lock< boost::fibers::mutex > lk( mtx_); |
b32b8144 | 121 | unsubscribe_( queue, s); |
7c673cae FG |
122 | } |
123 | ||
b32b8144 FG |
124 | // publish a message to all subscribers registerd to the queue |
125 | void publish( std::string const& queue, std::string const& msg) { | |
7c673cae | 126 | std::unique_lock< boost::fibers::mutex > lk( mtx_); |
b32b8144 | 127 | publish_( queue, msg); |
7c673cae FG |
128 | } |
129 | }; | |
130 | ||
b32b8144 FG |
131 | // a subscriber subscribes to a given queue in order to receive messages published on this queue |
132 | class subscriber_session : public std::enable_shared_from_this< subscriber_session > { | |
7c673cae | 133 | public: |
b32b8144 FG |
134 | explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) : |
135 | socket_( * io_service), | |
7c673cae FG |
136 | reg_( reg) { |
137 | } | |
138 | ||
139 | tcp::socket& socket() { | |
140 | return socket_; | |
141 | } | |
142 | ||
143 | // this function is executed inside the fiber | |
144 | void run() { | |
b32b8144 | 145 | std::string queue; |
7c673cae FG |
146 | try { |
147 | boost::system::error_code ec; | |
b32b8144 | 148 | // read first message == queue name |
7c673cae FG |
149 | // async_ready() returns if the the complete message is read |
150 | // until this the fiber is suspended until the complete message | |
151 | // is read int the given buffer 'data' | |
152 | boost::asio::async_read( | |
153 | socket_, | |
154 | boost::asio::buffer( data_), | |
155 | boost::fibers::asio::yield[ec]); | |
156 | if ( ec) { | |
b32b8144 | 157 | throw std::runtime_error("no queue from subscriber"); |
7c673cae | 158 | } |
b32b8144 | 159 | // first message ist equal to the queue name the publisher |
7c673cae | 160 | // publishes to |
b32b8144 FG |
161 | queue = data_; |
162 | // subscribe to new queue | |
163 | reg_.subscribe( queue, shared_from_this() ); | |
7c673cae FG |
164 | // read published messages |
165 | for (;;) { | |
166 | // wait for a conditon-variable for new messages | |
167 | // the fiber will be suspended until the condtion | |
168 | // becomes true and the fiber is resumed | |
169 | // published message is stored in buffer 'data_' | |
170 | std::unique_lock< boost::fibers::mutex > lk( mtx_); | |
171 | cond_.wait( lk); | |
172 | std::string data( data_); | |
173 | lk.unlock(); | |
7c673cae FG |
174 | // message '<fini>' terminates subscription |
175 | if ( "<fini>" == data) { | |
176 | break; | |
177 | } | |
178 | // async. write message to socket connected with | |
179 | // subscriber | |
180 | // async_write() returns if the complete message was writen | |
181 | // the fiber is suspended in the meanwhile | |
182 | boost::asio::async_write( | |
183 | socket_, | |
184 | boost::asio::buffer( data, data.size() ), | |
185 | boost::fibers::asio::yield[ec]); | |
186 | if ( ec == boost::asio::error::eof) { | |
187 | break; //connection closed cleanly by peer | |
188 | } else if ( ec) { | |
189 | throw boost::system::system_error( ec); //some other error | |
190 | } | |
191 | std::cout << "subscriber::run(): '" << data << "' written" << std::endl; | |
192 | } | |
193 | } catch ( std::exception const& e) { | |
b32b8144 | 194 | std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl; |
7c673cae FG |
195 | } |
196 | // close socket | |
197 | socket_.close(); | |
b32b8144 FG |
198 | // unregister queue |
199 | reg_.unsubscribe( queue, shared_from_this() ); | |
7c673cae FG |
200 | } |
201 | ||
202 | // called from publisher_session (running in other fiber) | |
203 | void publish( std::string const& msg) { | |
204 | std::unique_lock< boost::fibers::mutex > lk( mtx_); | |
205 | std::memset( data_, '\0', sizeof( data_)); | |
206 | std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size())); | |
207 | cond_.notify_one(); | |
208 | } | |
209 | ||
210 | private: | |
211 | tcp::socket socket_; | |
212 | registry & reg_; | |
213 | boost::fibers::mutex mtx_; | |
214 | boost::fibers::condition_variable cond_; | |
215 | // fixed size message | |
216 | char data_[max_length]; | |
217 | }; | |
218 | ||
219 | ||
220 | subscriptions::~subscriptions() { | |
b32b8144 | 221 | for ( subscriber_session_ptr s : subscribers_) { |
7c673cae FG |
222 | s->publish("<fini>"); |
223 | } | |
224 | } | |
225 | ||
226 | void | |
227 | subscriptions::publish( std::string const& msg) { | |
b32b8144 | 228 | for ( subscriber_session_ptr s : subscribers_) { |
7c673cae FG |
229 | s->publish( msg); |
230 | } | |
231 | } | |
232 | ||
b32b8144 FG |
233 | // a publisher publishes messages on its queue |
234 | // subscriber might register to this queue to get the published messages | |
235 | class publisher_session : public std::enable_shared_from_this< publisher_session > { | |
7c673cae | 236 | public: |
b32b8144 FG |
237 | explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) : |
238 | socket_( * io_service), | |
7c673cae FG |
239 | reg_( reg) { |
240 | } | |
241 | ||
242 | tcp::socket& socket() { | |
243 | return socket_; | |
244 | } | |
245 | ||
246 | // this function is executed inside the fiber | |
247 | void run() { | |
b32b8144 | 248 | std::string queue; |
7c673cae FG |
249 | try { |
250 | boost::system::error_code ec; | |
251 | // fixed size message | |
252 | char data[max_length]; | |
b32b8144 | 253 | // read first message == queue name |
7c673cae FG |
254 | // async_ready() returns if the the complete message is read |
255 | // until this the fiber is suspended until the complete message | |
256 | // is read int the given buffer 'data' | |
257 | boost::asio::async_read( | |
258 | socket_, | |
259 | boost::asio::buffer( data), | |
260 | boost::fibers::asio::yield[ec]); | |
261 | if ( ec) { | |
b32b8144 | 262 | throw std::runtime_error("no queue from publisher"); |
7c673cae | 263 | } |
b32b8144 | 264 | // first message ist equal to the queue name the publisher |
7c673cae | 265 | // publishes to |
b32b8144 FG |
266 | queue = data; |
267 | // register the new queue | |
268 | reg_.register_queue( queue); | |
7c673cae FG |
269 | // start publishing messages |
270 | for (;;) { | |
271 | // read message from publisher asyncronous | |
272 | // async_read() suspends this fiber until the complete emssage is read | |
273 | // and stored in the given buffer 'data' | |
274 | boost::asio::async_read( | |
275 | socket_, | |
276 | boost::asio::buffer( data), | |
277 | boost::fibers::asio::yield[ec]); | |
278 | if ( ec == boost::asio::error::eof) { | |
279 | break; //connection closed cleanly by peer | |
280 | } else if ( ec) { | |
281 | throw boost::system::system_error( ec); //some other error | |
282 | } | |
283 | // publish message to all subscribers | |
b32b8144 | 284 | reg_.publish( queue, std::string( data) ); |
7c673cae FG |
285 | } |
286 | } catch ( std::exception const& e) { | |
b32b8144 | 287 | std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl; |
7c673cae FG |
288 | } |
289 | // close socket | |
290 | socket_.close(); | |
b32b8144 FG |
291 | // unregister queue |
292 | reg_.unregister_queue( queue); | |
7c673cae FG |
293 | } |
294 | ||
295 | private: | |
296 | tcp::socket socket_; | |
297 | registry & reg_; | |
298 | }; | |
299 | ||
b32b8144 | 300 | typedef std::shared_ptr< publisher_session > publisher_session_ptr; |
7c673cae FG |
301 | |
302 | // function accepts connections requests from clients acting as a publisher | |
b32b8144 | 303 | void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service, |
7c673cae FG |
304 | unsigned short port, |
305 | registry & reg) { | |
306 | // create TCP-acceptor | |
b32b8144 | 307 | tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) ); |
7c673cae FG |
308 | // loop for accepting connection requests |
309 | for (;;) { | |
310 | boost::system::error_code ec; | |
311 | // create new publisher-session | |
312 | // this instance will be associated with one publisher | |
313 | publisher_session_ptr new_publisher_session = | |
b32b8144 | 314 | std::make_shared< publisher_session >( io_service, std::ref( reg) ); |
7c673cae FG |
315 | // async. accept of new connection request |
316 | // this function will suspend this execution context (fiber) until a | |
317 | // connection was established, after returning from this function a new client (publisher) | |
318 | // is connected | |
319 | acceptor.async_accept( | |
320 | new_publisher_session->socket(), | |
321 | boost::fibers::asio::yield[ec]); | |
322 | if ( ! ec) { | |
323 | // run the new publisher in its own fiber (one fiber for one client) | |
324 | boost::fibers::fiber( | |
b32b8144 | 325 | std::bind( & publisher_session::run, new_publisher_session) ).detach(); |
7c673cae FG |
326 | } |
327 | } | |
328 | } | |
329 | ||
330 | // function accepts connections requests from clients acting as a subscriber | |
b32b8144 | 331 | void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service, |
7c673cae FG |
332 | unsigned short port, |
333 | registry & reg) { | |
334 | // create TCP-acceptor | |
b32b8144 | 335 | tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) ); |
7c673cae FG |
336 | // loop for accepting connection requests |
337 | for (;;) { | |
338 | boost::system::error_code ec; | |
339 | // create new subscriber-session | |
340 | // this instance will be associated with one subscriber | |
341 | subscriber_session_ptr new_subscriber_session = | |
b32b8144 | 342 | std::make_shared< subscriber_session >( io_service, std::ref( reg) ); |
7c673cae FG |
343 | // async. accept of new connection request |
344 | // this function will suspend this execution context (fiber) until a | |
345 | // connection was established, after returning from this function a new client (subscriber) | |
346 | // is connected | |
347 | acceptor.async_accept( | |
348 | new_subscriber_session->socket(), | |
349 | boost::fibers::asio::yield[ec]); | |
350 | if ( ! ec) { | |
351 | // run the new subscriber in its own fiber (one fiber for one client) | |
352 | boost::fibers::fiber( | |
b32b8144 | 353 | std::bind( & subscriber_session::run, new_subscriber_session) ).detach(); |
7c673cae FG |
354 | } |
355 | } | |
356 | } | |
357 | ||
358 | ||
359 | int main( int argc, char* argv[]) { | |
360 | try { | |
361 | // create io_service for async. I/O | |
b32b8144 | 362 | std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >(); |
7c673cae FG |
363 | // register asio scheduler |
364 | boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); | |
b32b8144 | 365 | // registry for queues and its subscription |
7c673cae FG |
366 | registry reg; |
367 | // create an acceptor for publishers, run it as fiber | |
368 | boost::fibers::fiber( | |
b32b8144 | 369 | accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach(); |
7c673cae FG |
370 | // create an acceptor for subscribers, run it as fiber |
371 | boost::fibers::fiber( | |
b32b8144 | 372 | accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach(); |
7c673cae | 373 | // dispatch |
b32b8144 | 374 | io_service->run(); |
7c673cae FG |
375 | return EXIT_SUCCESS; |
376 | } catch ( std::exception const& e) { | |
377 | std::cerr << "Exception: " << e.what() << "\n"; | |
378 | } | |
379 | ||
380 | return EXIT_FAILURE; | |
381 | } |