#include <cstddef>
#include <cstdlib>
#include <map>
+#include <memory>
#include <set>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
-#include <boost/bind.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/foreach.hpp>
-#include <boost/make_shared.hpp>
-#include <boost/ref.hpp>
-#include <boost/shared_ptr.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/all.hpp>
const std::size_t max_length = 1024;
class subscriber_session;
-typedef boost::shared_ptr<subscriber_session> subscriber_session_ptr;
+typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
-// a channel has n subscribers (subscriptions)
-// this class holds a list of subcribers for one channel
+// a queue has n subscribers (subscriptions)
+// this class holds a list of subcribers for one queue
class subscriptions {
public:
~subscriptions();
- // subscribe to this channel
+ // subscribe to this queue
void subscribe( subscriber_session_ptr const& s) {
subscribers_.insert( s);
}
- // unsubscribe from this channel
+ // unsubscribe from this queue
void unsubscribe( subscriber_session_ptr const& s) {
subscribers_.erase(s);
}
std::set< subscriber_session_ptr > subscribers_;
};
-// a class to register channels and to subsribe clients to this channels
+// a class to register queues and to subsribe clients to this queues
class registry : private boost::noncopyable {
private:
- typedef std::map< std::string, boost::shared_ptr< subscriptions > > channels_cont;
- typedef channels_cont::iterator channels_iter;
+ typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
+ typedef queues_cont::iterator queues_iter;
boost::fibers::mutex mtx_;
- channels_cont channels_;
+ queues_cont queues_;
- void register_channel_( std::string const& channel) {
- if ( channels_.end() != channels_.find( channel) ) {
- throw std::runtime_error("channel already exists");
+ void register_queue_( std::string const& queue) {
+ if ( queues_.end() != queues_.find( queue) ) {
+ throw std::runtime_error("queue already exists");
}
- channels_[channel] = boost::make_shared< subscriptions >();
- std::cout << "new channel '" << channel << "' registered" << std::endl;
+ queues_[queue] = std::make_shared< subscriptions >();
+ std::cout << "new queue '" << queue << "' registered" << std::endl;
}
- void unregister_channel_( std::string const& channel) {
- channels_.erase( channel);
- std::cout << "channel '" << channel << "' unregistered" << std::endl;
+ void unregister_queue_( std::string const& queue) {
+ queues_.erase( queue);
+ std::cout << "queue '" << queue << "' unregistered" << std::endl;
}
- void subscribe_( std::string const& channel, subscriber_session_ptr s) {
- channels_iter iter = channels_.find( channel);
- if ( channels_.end() == iter ) {
- throw std::runtime_error("channel does not exist");
+ void subscribe_( std::string const& queue, subscriber_session_ptr s) {
+ queues_iter iter = queues_.find( queue);
+ if ( queues_.end() == iter ) {
+ throw std::runtime_error("queue does not exist");
}
iter->second->subscribe( s);
- std::cout << "new subscription to channel '" << channel << "'" << std::endl;
+ std::cout << "new subscription to queue '" << queue << "'" << std::endl;
}
- void unsubscribe_( std::string const& channel, subscriber_session_ptr s) {
- channels_iter iter = channels_.find( channel);
- if ( channels_.end() != iter ) {
+ void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
+ queues_iter iter = queues_.find( queue);
+ if ( queues_.end() != iter ) {
iter->second->unsubscribe( s);
}
}
- void publish_( std::string const& channel, std::string const& msg) {
- channels_iter iter = channels_.find( channel);
- if ( channels_.end() == iter ) {
- throw std::runtime_error("channel does not exist");
+ void publish_( std::string const& queue, std::string const& msg) {
+ queues_iter iter = queues_.find( queue);
+ if ( queues_.end() == iter ) {
+ throw std::runtime_error("queue does not exist");
}
iter->second->publish( msg);
- std::cout << "message '" << msg << "' to publish on channel '" << channel << "'" << std::endl;
+ std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
}
public:
- // add a channel to registry
- void register_channel( std::string const& channel) {
+ // add a queue to registry
+ void register_queue( std::string const& queue) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
- register_channel_( channel);
+ register_queue_( queue);
}
- // remove a channel from registry
- void unregister_channel( std::string const& channel) {
+ // remove a queue from registry
+ void unregister_queue( std::string const& queue) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
- unregister_channel_( channel);
+ unregister_queue_( queue);
}
- // subscribe to a channel
- void subscribe( std::string const& channel, subscriber_session_ptr s) {
+ // subscribe to a queue
+ void subscribe( std::string const& queue, subscriber_session_ptr s) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
- subscribe_( channel, s);
+ subscribe_( queue, s);
}
- // unsubscribe from a channel
- void unsubscribe( std::string const& channel, subscriber_session_ptr s) {
+ // unsubscribe from a queue
+ void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
- unsubscribe_( channel, s);
+ unsubscribe_( queue, s);
}
- // publish a message to all subscribers registerd to the channel
- void publish( std::string const& channel, std::string const& msg) {
+ // publish a message to all subscribers registerd to the queue
+ void publish( std::string const& queue, std::string const& msg) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
- publish_( channel, msg);
+ publish_( queue, msg);
}
};
-// a subscriber subscribes to a given channel in order to receive messages published on this channel
-class subscriber_session : public boost::enable_shared_from_this< subscriber_session > {
+// a subscriber subscribes to a given queue in order to receive messages published on this queue
+class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
public:
- explicit subscriber_session( boost::asio::io_service & io_service, registry & reg) :
- socket_( io_service),
+ explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
+ socket_( * io_service),
reg_( reg) {
}
// this function is executed inside the fiber
void run() {
- std::string channel;
+ std::string queue;
try {
boost::system::error_code ec;
- // read first message == channel name
+ // read first message == queue name
// async_ready() returns if the the complete message is read
// until this the fiber is suspended until the complete message
// is read int the given buffer 'data'
boost::asio::buffer( data_),
boost::fibers::asio::yield[ec]);
if ( ec) {
- throw std::runtime_error("no channel from subscriber");
+ throw std::runtime_error("no queue from subscriber");
}
- // first message ist equal to the channel name the publisher
+ // first message ist equal to the queue name the publisher
// publishes to
- channel = data_;
- // subscribe to new channel
- reg_.subscribe( channel, shared_from_this() );
+ queue = data_;
+ // subscribe to new queue
+ reg_.subscribe( queue, shared_from_this() );
// read published messages
for (;;) {
// wait for a conditon-variable for new messages
cond_.wait( lk);
std::string data( data_);
lk.unlock();
- std::cout << "subscriber::run(): '" << data << std::endl;
// message '<fini>' terminates subscription
if ( "<fini>" == data) {
break;
std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
}
} catch ( std::exception const& e) {
- std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl;
+ std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
}
// close socket
socket_.close();
- // unregister channel
- reg_.unsubscribe( channel, shared_from_this() );
+ // unregister queue
+ reg_.unsubscribe( queue, shared_from_this() );
}
// called from publisher_session (running in other fiber)
subscriptions::~subscriptions() {
- BOOST_FOREACH( subscriber_session_ptr s, subscribers_) {
+ for ( subscriber_session_ptr s : subscribers_) {
s->publish("<fini>");
}
}
void
subscriptions::publish( std::string const& msg) {
- BOOST_FOREACH( subscriber_session_ptr s, subscribers_) {
+ for ( subscriber_session_ptr s : subscribers_) {
s->publish( msg);
}
}
-// a publisher publishes messages on its channel
-// subscriber might register to this channel to get the published messages
-class publisher_session : public boost::enable_shared_from_this< publisher_session > {
+// a publisher publishes messages on its queue
+// subscriber might register to this queue to get the published messages
+class publisher_session : public std::enable_shared_from_this< publisher_session > {
public:
- explicit publisher_session( boost::asio::io_service & io_service, registry & reg) :
- socket_( io_service),
+ explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
+ socket_( * io_service),
reg_( reg) {
}
// this function is executed inside the fiber
void run() {
- std::string channel;
+ std::string queue;
try {
boost::system::error_code ec;
// fixed size message
char data[max_length];
- // read first message == channel name
+ // read first message == queue name
// async_ready() returns if the the complete message is read
// until this the fiber is suspended until the complete message
// is read int the given buffer 'data'
boost::asio::buffer( data),
boost::fibers::asio::yield[ec]);
if ( ec) {
- throw std::runtime_error("no channel from publisher");
+ throw std::runtime_error("no queue from publisher");
}
- // first message ist equal to the channel name the publisher
+ // first message ist equal to the queue name the publisher
// publishes to
- channel = data;
- // register the new channel
- reg_.register_channel( channel);
+ queue = data;
+ // register the new queue
+ reg_.register_queue( queue);
// start publishing messages
for (;;) {
// read message from publisher asyncronous
throw boost::system::system_error( ec); //some other error
}
// publish message to all subscribers
- reg_.publish( channel, std::string( data) );
+ reg_.publish( queue, std::string( data) );
}
} catch ( std::exception const& e) {
- std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl;
+ std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
}
// close socket
socket_.close();
- // unregister channel
- reg_.unregister_channel( channel);
+ // unregister queue
+ reg_.unregister_queue( queue);
}
private:
registry & reg_;
};
-typedef boost::shared_ptr< publisher_session > publisher_session_ptr;
+typedef std::shared_ptr< publisher_session > publisher_session_ptr;
// function accepts connections requests from clients acting as a publisher
-void accept_publisher( boost::asio::io_service& io_service,
+void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
unsigned short port,
registry & reg) {
// create TCP-acceptor
- tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
+ tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
// loop for accepting connection requests
for (;;) {
boost::system::error_code ec;
// create new publisher-session
// this instance will be associated with one publisher
publisher_session_ptr new_publisher_session =
- boost::make_shared<publisher_session>( boost::ref( io_service), boost::ref( reg) );
+ std::make_shared< publisher_session >( io_service, std::ref( reg) );
// async. accept of new connection request
// this function will suspend this execution context (fiber) until a
// connection was established, after returning from this function a new client (publisher)
if ( ! ec) {
// run the new publisher in its own fiber (one fiber for one client)
boost::fibers::fiber(
- boost::bind( & publisher_session::run, new_publisher_session) ).detach();
+ std::bind( & publisher_session::run, new_publisher_session) ).detach();
}
}
}
// function accepts connections requests from clients acting as a subscriber
-void accept_subscriber( boost::asio::io_service& io_service,
+void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
unsigned short port,
registry & reg) {
// create TCP-acceptor
- tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
+ tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
// loop for accepting connection requests
for (;;) {
boost::system::error_code ec;
// create new subscriber-session
// this instance will be associated with one subscriber
subscriber_session_ptr new_subscriber_session =
- boost::make_shared<subscriber_session>( boost::ref( io_service), boost::ref( reg) );
+ std::make_shared< subscriber_session >( io_service, std::ref( reg) );
// async. accept of new connection request
// this function will suspend this execution context (fiber) until a
// connection was established, after returning from this function a new client (subscriber)
if ( ! ec) {
// run the new subscriber in its own fiber (one fiber for one client)
boost::fibers::fiber(
- boost::bind( & subscriber_session::run, new_subscriber_session) ).detach();
+ std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
}
}
}
int main( int argc, char* argv[]) {
try {
// create io_service for async. I/O
- boost::asio::io_service io_service;
+ std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
// register asio scheduler
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
- // registry for channels and its subscription
+ // registry for queues and its subscription
registry reg;
// create an acceptor for publishers, run it as fiber
boost::fibers::fiber(
- accept_publisher, boost::ref( io_service), 9997, boost::ref( reg) ).detach();
+ accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
// create an acceptor for subscribers, run it as fiber
boost::fibers::fiber(
- accept_subscriber, boost::ref( io_service), 9998, boost::ref( reg) ).detach();
+ accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
// dispatch
- io_service.run();
+ io_service->run();
return EXIT_SUCCESS;
} catch ( std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";