1 // Copyright Oliver Kowalke 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
13 #include <boost/asio.hpp>
14 #include <boost/bind.hpp>
15 #include <boost/enable_shared_from_this.hpp>
16 #include <boost/foreach.hpp>
17 #include <boost/make_shared.hpp>
18 #include <boost/ref.hpp>
19 #include <boost/shared_ptr.hpp>
20 #include <boost/utility.hpp>
22 #include <boost/fiber/all.hpp>
23 #include "../round_robin.hpp"
24 #include "../yield.hpp"
26 using boost::asio::ip::tcp
;
28 const std::size_t max_length
= 1024;
30 class subscriber_session
;
31 typedef boost::shared_ptr
<subscriber_session
> subscriber_session_ptr
;
33 // a channel has n subscribers (subscriptions)
34 // this class holds a list of subcribers for one channel
39 // subscribe to this channel
40 void subscribe( subscriber_session_ptr
const& s
) {
41 subscribers_
.insert( s
);
44 // unsubscribe from this channel
45 void unsubscribe( subscriber_session_ptr
const& s
) {
46 subscribers_
.erase(s
);
49 // publish a message, e.g. push this message to all subscribers
50 void publish( std::string
const& msg
);
53 // list of subscribers
54 std::set
< subscriber_session_ptr
> subscribers_
;
57 // a class to register channels and to subsribe clients to this channels
58 class registry
: private boost::noncopyable
{
60 typedef std::map
< std::string
, boost::shared_ptr
< subscriptions
> > channels_cont
;
61 typedef channels_cont::iterator channels_iter
;
63 boost::fibers::mutex mtx_
;
64 channels_cont channels_
;
66 void register_channel_( std::string
const& channel
) {
67 if ( channels_
.end() != channels_
.find( channel
) ) {
68 throw std::runtime_error("channel already exists");
70 channels_
[channel
] = boost::make_shared
< subscriptions
>();
71 std::cout
<< "new channel '" << channel
<< "' registered" << std::endl
;
74 void unregister_channel_( std::string
const& channel
) {
75 channels_
.erase( channel
);
76 std::cout
<< "channel '" << channel
<< "' unregistered" << std::endl
;
79 void subscribe_( std::string
const& channel
, subscriber_session_ptr s
) {
80 channels_iter iter
= channels_
.find( channel
);
81 if ( channels_
.end() == iter
) {
82 throw std::runtime_error("channel does not exist");
84 iter
->second
->subscribe( s
);
85 std::cout
<< "new subscription to channel '" << channel
<< "'" << std::endl
;
88 void unsubscribe_( std::string
const& channel
, subscriber_session_ptr s
) {
89 channels_iter iter
= channels_
.find( channel
);
90 if ( channels_
.end() != iter
) {
91 iter
->second
->unsubscribe( s
);
95 void publish_( std::string
const& channel
, std::string
const& msg
) {
96 channels_iter iter
= channels_
.find( channel
);
97 if ( channels_
.end() == iter
) {
98 throw std::runtime_error("channel does not exist");
100 iter
->second
->publish( msg
);
101 std::cout
<< "message '" << msg
<< "' to publish on channel '" << channel
<< "'" << std::endl
;
105 // add a channel to registry
106 void register_channel( std::string
const& channel
) {
107 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
108 register_channel_( channel
);
111 // remove a channel from registry
112 void unregister_channel( std::string
const& channel
) {
113 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
114 unregister_channel_( channel
);
117 // subscribe to a channel
118 void subscribe( std::string
const& channel
, subscriber_session_ptr s
) {
119 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
120 subscribe_( channel
, s
);
123 // unsubscribe from a channel
124 void unsubscribe( std::string
const& channel
, subscriber_session_ptr s
) {
125 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
126 unsubscribe_( channel
, s
);
129 // publish a message to all subscribers registerd to the channel
130 void publish( std::string
const& channel
, std::string
const& msg
) {
131 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
132 publish_( channel
, msg
);
136 // a subscriber subscribes to a given channel in order to receive messages published on this channel
137 class subscriber_session
: public boost::enable_shared_from_this
< subscriber_session
> {
139 explicit subscriber_session( boost::asio::io_service
& io_service
, registry
& reg
) :
140 socket_( io_service
),
144 tcp::socket
& socket() {
148 // this function is executed inside the fiber
152 boost::system::error_code ec
;
153 // read first message == channel name
154 // async_ready() returns if the the complete message is read
155 // until this the fiber is suspended until the complete message
156 // is read int the given buffer 'data'
157 boost::asio::async_read(
159 boost::asio::buffer( data_
),
160 boost::fibers::asio::yield
[ec
]);
162 throw std::runtime_error("no channel from subscriber");
164 // first message ist equal to the channel name the publisher
167 // subscribe to new channel
168 reg_
.subscribe( channel
, shared_from_this() );
169 // read published messages
171 // wait for a conditon-variable for new messages
172 // the fiber will be suspended until the condtion
173 // becomes true and the fiber is resumed
174 // published message is stored in buffer 'data_'
175 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
177 std::string
data( data_
);
179 std::cout
<< "subscriber::run(): '" << data
<< std::endl
;
180 // message '<fini>' terminates subscription
181 if ( "<fini>" == data
) {
184 // async. write message to socket connected with
186 // async_write() returns if the complete message was writen
187 // the fiber is suspended in the meanwhile
188 boost::asio::async_write(
190 boost::asio::buffer( data
, data
.size() ),
191 boost::fibers::asio::yield
[ec
]);
192 if ( ec
== boost::asio::error::eof
) {
193 break; //connection closed cleanly by peer
195 throw boost::system::system_error( ec
); //some other error
197 std::cout
<< "subscriber::run(): '" << data
<< "' written" << std::endl
;
199 } catch ( std::exception
const& e
) {
200 std::cerr
<< "subscriber [" << channel
<< "] failed: " << e
.what() << std::endl
;
204 // unregister channel
205 reg_
.unsubscribe( channel
, shared_from_this() );
208 // called from publisher_session (running in other fiber)
209 void publish( std::string
const& msg
) {
210 std::unique_lock
< boost::fibers::mutex
> lk( mtx_
);
211 std::memset( data_
, '\0', sizeof( data_
));
212 std::memcpy( data_
, msg
.c_str(), (std::min
)(max_length
, msg
.size()));
219 boost::fibers::mutex mtx_
;
220 boost::fibers::condition_variable cond_
;
221 // fixed size message
222 char data_
[max_length
];
226 subscriptions::~subscriptions() {
227 BOOST_FOREACH( subscriber_session_ptr s
, subscribers_
) {
228 s
->publish("<fini>");
233 subscriptions::publish( std::string
const& msg
) {
234 BOOST_FOREACH( subscriber_session_ptr s
, subscribers_
) {
239 // a publisher publishes messages on its channel
240 // subscriber might register to this channel to get the published messages
241 class publisher_session
: public boost::enable_shared_from_this
< publisher_session
> {
243 explicit publisher_session( boost::asio::io_service
& io_service
, registry
& reg
) :
244 socket_( io_service
),
248 tcp::socket
& socket() {
252 // this function is executed inside the fiber
256 boost::system::error_code ec
;
257 // fixed size message
258 char data
[max_length
];
259 // read first message == channel name
260 // async_ready() returns if the the complete message is read
261 // until this the fiber is suspended until the complete message
262 // is read int the given buffer 'data'
263 boost::asio::async_read(
265 boost::asio::buffer( data
),
266 boost::fibers::asio::yield
[ec
]);
268 throw std::runtime_error("no channel from publisher");
270 // first message ist equal to the channel name the publisher
273 // register the new channel
274 reg_
.register_channel( channel
);
275 // start publishing messages
277 // read message from publisher asyncronous
278 // async_read() suspends this fiber until the complete emssage is read
279 // and stored in the given buffer 'data'
280 boost::asio::async_read(
282 boost::asio::buffer( data
),
283 boost::fibers::asio::yield
[ec
]);
284 if ( ec
== boost::asio::error::eof
) {
285 break; //connection closed cleanly by peer
287 throw boost::system::system_error( ec
); //some other error
289 // publish message to all subscribers
290 reg_
.publish( channel
, std::string( data
) );
292 } catch ( std::exception
const& e
) {
293 std::cerr
<< "publisher [" << channel
<< "] failed: " << e
.what() << std::endl
;
297 // unregister channel
298 reg_
.unregister_channel( channel
);
306 typedef boost::shared_ptr
< publisher_session
> publisher_session_ptr
;
308 // function accepts connections requests from clients acting as a publisher
309 void accept_publisher( boost::asio::io_service
& io_service
,
312 // create TCP-acceptor
313 tcp::acceptor
acceptor( io_service
, tcp::endpoint( tcp::v4(), port
) );
314 // loop for accepting connection requests
316 boost::system::error_code ec
;
317 // create new publisher-session
318 // this instance will be associated with one publisher
319 publisher_session_ptr new_publisher_session
=
320 boost::make_shared
<publisher_session
>( boost::ref( io_service
), boost::ref( reg
) );
321 // async. accept of new connection request
322 // this function will suspend this execution context (fiber) until a
323 // connection was established, after returning from this function a new client (publisher)
325 acceptor
.async_accept(
326 new_publisher_session
->socket(),
327 boost::fibers::asio::yield
[ec
]);
329 // run the new publisher in its own fiber (one fiber for one client)
330 boost::fibers::fiber(
331 boost::bind( & publisher_session::run
, new_publisher_session
) ).detach();
336 // function accepts connections requests from clients acting as a subscriber
337 void accept_subscriber( boost::asio::io_service
& io_service
,
340 // create TCP-acceptor
341 tcp::acceptor
acceptor( io_service
, tcp::endpoint( tcp::v4(), port
) );
342 // loop for accepting connection requests
344 boost::system::error_code ec
;
345 // create new subscriber-session
346 // this instance will be associated with one subscriber
347 subscriber_session_ptr new_subscriber_session
=
348 boost::make_shared
<subscriber_session
>( boost::ref( io_service
), boost::ref( reg
) );
349 // async. accept of new connection request
350 // this function will suspend this execution context (fiber) until a
351 // connection was established, after returning from this function a new client (subscriber)
353 acceptor
.async_accept(
354 new_subscriber_session
->socket(),
355 boost::fibers::asio::yield
[ec
]);
357 // run the new subscriber in its own fiber (one fiber for one client)
358 boost::fibers::fiber(
359 boost::bind( & subscriber_session::run
, new_subscriber_session
) ).detach();
365 int main( int argc
, char* argv
[]) {
367 // create io_service for async. I/O
368 boost::asio::io_service io_service
;
369 // register asio scheduler
370 boost::fibers::use_scheduling_algorithm
< boost::fibers::asio::round_robin
>( io_service
);
371 // registry for channels and its subscription
373 // create an acceptor for publishers, run it as fiber
374 boost::fibers::fiber(
375 accept_publisher
, boost::ref( io_service
), 9997, boost::ref( reg
) ).detach();
376 // create an acceptor for subscribers, run it as fiber
377 boost::fibers::fiber(
378 accept_subscriber
, boost::ref( io_service
), 9998, boost::ref( reg
) ).detach();
382 } catch ( std::exception
const& e
) {
383 std::cerr
<< "Exception: " << e
.what() << "\n";