1 // Copyright Oliver Kowalke 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
14 #include <boost/asio.hpp>
15 #include <boost/utility.hpp>
17 #include <boost/fiber/all.hpp>
18 #include "../round_robin.hpp"
19 #include "../yield.hpp"
21 using boost::asio::ip::tcp
;
23 const std::size_t max_length
= 1024;
25 class subscriber_session
;
26 typedef std::shared_ptr
< subscriber_session
> subscriber_session_ptr
;
28 // a queue has n subscribers (subscriptions)
29 // this class holds a list of subcribers for one queue
34 // subscribe to this queue
35 void subscribe( subscriber_session_ptr
const& s
) {
36 subscribers_
.insert( s
);
39 // unsubscribe from this queue
40 void unsubscribe( subscriber_session_ptr
const& s
) {
41 subscribers_
.erase(s
);
44 // publish a message, e.g. push this message to all subscribers
45 void publish( std::string
const& msg
);
48 // list of subscribers
49 std::set
< subscriber_session_ptr
> subscribers_
;
52 // a class to register queues and to subsribe clients to this queues
53 class registry
: private boost::noncopyable
{
55 typedef std::map
< std::string
, std::shared_ptr
< subscriptions
> > queues_cont
;
56 typedef queues_cont::iterator queues_iter
;
58 boost::fibers::mutex mtx_
;
61 void register_queue_( std::string
const& queue
) {
62 if ( queues_
.end() != queues_
.find( queue
) ) {
63 throw std::runtime_error("queue already exists");
65 queues_
[queue
] = std::make_shared
< subscriptions
>();
66 std::cout
<< "new queue '" << queue
<< "' registered" << std::endl
;
69 void unregister_queue_( std::string
const& queue
) {
70 queues_
.erase( queue
);
71 std::cout
<< "queue '" << queue
<< "' unregistered" << std::endl
;
74 void subscribe_( std::string
const& queue
, subscriber_session_ptr s
) {
75 queues_iter iter
= queues_
.find( queue
);
76 if ( queues_
.end() == iter
) {
77 throw std::runtime_error("queue does not exist");
79 iter
->second
->subscribe( s
);
80 std::cout
<< "new subscription to queue '" << queue
<< "'" << std::endl
;
83 void unsubscribe_( std::string
const& queue
, subscriber_session_ptr s
) {
84 queues_iter iter
= queues_
.find( queue
);
85 if ( queues_
.end() != iter
) {
86 iter
->second
->unsubscribe( s
);
90 void publish_( std::string
const& queue
, std::string
const& msg
) {
91 queues_iter iter
= queues_
.find( queue
);
92 if ( queues_
.end() == iter
) {
93 throw std::runtime_error("queue does not exist");
95 iter
->second
->publish( msg
);
96 std::cout
<< "message '" << msg
<< "' to publish on queue '" << queue
<< "'" << std::endl
;
100 // add a queue to registry
101 void register_queue( std::string
const& queue
) {
102 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
103 register_queue_( queue
);
106 // remove a queue from registry
107 void unregister_queue( std::string
const& queue
) {
108 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
109 unregister_queue_( queue
);
112 // subscribe to a queue
113 void subscribe( std::string
const& queue
, subscriber_session_ptr s
) {
114 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
115 subscribe_( queue
, s
);
118 // unsubscribe from a queue
119 void unsubscribe( std::string
const& queue
, subscriber_session_ptr s
) {
120 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
121 unsubscribe_( queue
, s
);
124 // publish a message to all subscribers registerd to the queue
125 void publish( std::string
const& queue
, std::string
const& msg
) {
126 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
127 publish_( queue
, msg
);
131 // a subscriber subscribes to a given queue in order to receive messages published on this queue
132 class subscriber_session
: public std::enable_shared_from_this
< subscriber_session
> {
134 explicit subscriber_session( std::shared_ptr
< boost::asio::io_service
> const& io_service
, registry
& reg
) :
135 socket_( * io_service
),
139 tcp::socket
& socket() {
143 // this function is executed inside the fiber
147 boost::system::error_code ec
;
148 // read first message == queue name
149 // async_ready() returns if the the complete message is read
150 // until this the fiber is suspended until the complete message
151 // is read int the given buffer 'data'
152 boost::asio::async_read(
154 boost::asio::buffer( data_
),
155 boost::fibers::asio::yield
[ec
]);
157 throw std::runtime_error("no queue from subscriber");
159 // first message ist equal to the queue name the publisher
162 // subscribe to new queue
163 reg_
.subscribe( queue
, shared_from_this() );
164 // read published messages
166 // wait for a conditon-variable for new messages
167 // the fiber will be suspended until the condtion
168 // becomes true and the fiber is resumed
169 // published message is stored in buffer 'data_'
170 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
172 std::string
data( data_
);
174 // message '<fini>' terminates subscription
175 if ( "<fini>" == data
) {
178 // async. write message to socket connected with
180 // async_write() returns if the complete message was writen
181 // the fiber is suspended in the meanwhile
182 boost::asio::async_write(
184 boost::asio::buffer( data
, data
.size() ),
185 boost::fibers::asio::yield
[ec
]);
186 if ( ec
== boost::asio::error::eof
) {
187 break; //connection closed cleanly by peer
189 throw boost::system::system_error( ec
); //some other error
191 std::cout
<< "subscriber::run(): '" << data
<< "' written" << std::endl
;
193 } catch ( std::exception
const& e
) {
194 std::cerr
<< "subscriber [" << queue
<< "] failed: " << e
.what() << std::endl
;
199 reg_
.unsubscribe( queue
, shared_from_this() );
202 // called from publisher_session (running in other fiber)
203 void publish( std::string
const& msg
) {
204 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
205 std::memset( data_
, '\0', sizeof( data_
));
206 std::memcpy( data_
, msg
.c_str(), (std::min
)(max_length
, msg
.size()));
213 boost::fibers::mutex mtx_
;
214 boost::fibers::condition_variable cond_
;
215 // fixed size message
216 char data_
[max_length
];
220 subscriptions::~subscriptions() {
221 for ( subscriber_session_ptr s
: subscribers_
) {
222 s
->publish("<fini>");
227 subscriptions::publish( std::string
const& msg
) {
228 for ( subscriber_session_ptr s
: subscribers_
) {
233 // a publisher publishes messages on its queue
234 // subscriber might register to this queue to get the published messages
235 class publisher_session
: public std::enable_shared_from_this
< publisher_session
> {
237 explicit publisher_session( std::shared_ptr
< boost::asio::io_service
> const& io_service
, registry
& reg
) :
238 socket_( * io_service
),
242 tcp::socket
& socket() {
246 // this function is executed inside the fiber
250 boost::system::error_code ec
;
251 // fixed size message
252 char data
[max_length
];
253 // read first message == queue name
254 // async_ready() returns if the the complete message is read
255 // until this the fiber is suspended until the complete message
256 // is read int the given buffer 'data'
257 boost::asio::async_read(
259 boost::asio::buffer( data
),
260 boost::fibers::asio::yield
[ec
]);
262 throw std::runtime_error("no queue from publisher");
264 // first message ist equal to the queue name the publisher
267 // register the new queue
268 reg_
.register_queue( queue
);
269 // start publishing messages
271 // read message from publisher asyncronous
272 // async_read() suspends this fiber until the complete emssage is read
273 // and stored in the given buffer 'data'
274 boost::asio::async_read(
276 boost::asio::buffer( data
),
277 boost::fibers::asio::yield
[ec
]);
278 if ( ec
== boost::asio::error::eof
) {
279 break; //connection closed cleanly by peer
281 throw boost::system::system_error( ec
); //some other error
283 // publish message to all subscribers
284 reg_
.publish( queue
, std::string( data
) );
286 } catch ( std::exception
const& e
) {
287 std::cerr
<< "publisher [" << queue
<< "] failed: " << e
.what() << std::endl
;
292 reg_
.unregister_queue( queue
);
300 typedef std::shared_ptr
< publisher_session
> publisher_session_ptr
;
302 // function accepts connections requests from clients acting as a publisher
303 void accept_publisher( std::shared_ptr
< boost::asio::io_service
> const& io_service
,
306 // create TCP-acceptor
307 tcp::acceptor
acceptor( * io_service
, tcp::endpoint( tcp::v4(), port
) );
308 // loop for accepting connection requests
310 boost::system::error_code ec
;
311 // create new publisher-session
312 // this instance will be associated with one publisher
313 publisher_session_ptr new_publisher_session
=
314 std::make_shared
< publisher_session
>( io_service
, std::ref( reg
) );
315 // async. accept of new connection request
316 // this function will suspend this execution context (fiber) until a
317 // connection was established, after returning from this function a new client (publisher)
319 acceptor
.async_accept(
320 new_publisher_session
->socket(),
321 boost::fibers::asio::yield
[ec
]);
323 // run the new publisher in its own fiber (one fiber for one client)
324 boost::fibers::fiber(
325 std::bind( & publisher_session::run
, new_publisher_session
) ).detach();
330 // function accepts connections requests from clients acting as a subscriber
331 void accept_subscriber( std::shared_ptr
< boost::asio::io_service
> const& io_service
,
334 // create TCP-acceptor
335 tcp::acceptor
acceptor( * io_service
, tcp::endpoint( tcp::v4(), port
) );
336 // loop for accepting connection requests
338 boost::system::error_code ec
;
339 // create new subscriber-session
340 // this instance will be associated with one subscriber
341 subscriber_session_ptr new_subscriber_session
=
342 std::make_shared
< subscriber_session
>( io_service
, std::ref( reg
) );
343 // async. accept of new connection request
344 // this function will suspend this execution context (fiber) until a
345 // connection was established, after returning from this function a new client (subscriber)
347 acceptor
.async_accept(
348 new_subscriber_session
->socket(),
349 boost::fibers::asio::yield
[ec
]);
351 // run the new subscriber in its own fiber (one fiber for one client)
352 boost::fibers::fiber(
353 std::bind( & subscriber_session::run
, new_subscriber_session
) ).detach();
359 int main( int argc
, char* argv
[]) {
361 // create io_service for async. I/O
362 std::shared_ptr
< boost::asio::io_service
> io_service
= std::make_shared
< boost::asio::io_service
>();
363 // register asio scheduler
364 boost::fibers::use_scheduling_algorithm
< boost::fibers::asio::round_robin
>( io_service
);
365 // registry for queues and its subscription
367 // create an acceptor for publishers, run it as fiber
368 boost::fibers::fiber(
369 accept_publisher
, std::ref( io_service
), 9997, std::ref( reg
) ).detach();
370 // create an acceptor for subscribers, run it as fiber
371 boost::fibers::fiber(
372 accept_subscriber
, std::ref( io_service
), 9998, std::ref( reg
) ).detach();
376 } catch ( std::exception
const& e
) {
377 std::cerr
<< "Exception: " << e
.what() << "\n";