]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/libs/fiber/examples/asio/ps/server.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / asio / ps / server.cpp
index 5bd06cee24caa88c0ee6a5f491ad2409daaba4f5..aeb58f236012d8a170aa12c854238feacdcc8e40 100644 (file)
@@ -6,17 +6,12 @@
 #include <cstddef>
 #include <cstdlib>
 #include <map>
+#include <memory>
 #include <set>
 #include <iostream>
 #include <string>
 
 #include <boost/asio.hpp>
-#include <boost/bind.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/foreach.hpp>
-#include <boost/make_shared.hpp>
-#include <boost/ref.hpp>
-#include <boost/shared_ptr.hpp>
 #include <boost/utility.hpp>
 
 #include <boost/fiber/all.hpp>
@@ -28,20 +23,20 @@ using boost::asio::ip::tcp;
 const std::size_t max_length = 1024;
 
 class subscriber_session;
-typedef boost::shared_ptr<subscriber_session> subscriber_session_ptr;
+typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
 
-// a channel has n subscribers (subscriptions)
-// this class holds a list of subcribers for one channel
+// a queue has n subscribers (subscriptions)
+// this class holds a list of subcribers for one queue
 class subscriptions {
 public:
     ~subscriptions();
 
-    // subscribe to this channel
+    // subscribe to this queue
     void subscribe( subscriber_session_ptr const& s) {
         subscribers_.insert( s);
     }
 
-    // unsubscribe from this channel
+    // unsubscribe from this queue
     void unsubscribe( subscriber_session_ptr const& s) {
         subscribers_.erase(s);
     }
@@ -54,90 +49,90 @@ private:
     std::set< subscriber_session_ptr >  subscribers_;
 };
 
-// a class to register channels and to subsribe clients to this channels
+// a class to register queues and to subsribe clients to this queues
 class registry : private boost::noncopyable {
 private:
-    typedef std::map< std::string, boost::shared_ptr< subscriptions > > channels_cont;
-    typedef channels_cont::iterator channels_iter;
+    typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
+    typedef queues_cont::iterator queues_iter;
 
     boost::fibers::mutex    mtx_;
-    channels_cont           channels_;
+    queues_cont           queues_;
 
-    void register_channel_( std::string const& channel) {
-        if ( channels_.end() != channels_.find( channel) ) {
-            throw std::runtime_error("channel already exists");
+    void register_queue_( std::string const& queue) {
+        if ( queues_.end() != queues_.find( queue) ) {
+            throw std::runtime_error("queue already exists");
         }
-        channels_[channel] = boost::make_shared< subscriptions >();
-        std::cout << "new channel '" << channel << "' registered" << std::endl;
+        queues_[queue] = std::make_shared< subscriptions >();
+        std::cout << "new queue '" << queue << "' registered" << std::endl;
     }
 
-    void unregister_channel_( std::string const& channel) {
-        channels_.erase( channel);
-        std::cout << "channel '" << channel << "' unregistered" << std::endl;
+    void unregister_queue_( std::string const& queue) {
+        queues_.erase( queue);
+        std::cout << "queue '" << queue << "' unregistered" << std::endl;
     }
 
-    void subscribe_( std::string const& channel, subscriber_session_ptr s) {
-        channels_iter iter = channels_.find( channel);
-        if ( channels_.end() == iter ) {
-            throw std::runtime_error("channel does not exist");
+    void subscribe_( std::string const& queue, subscriber_session_ptr s) {
+        queues_iter iter = queues_.find( queue);
+        if ( queues_.end() == iter ) {
+            throw std::runtime_error("queue does not exist");
         }
         iter->second->subscribe( s);
-        std::cout << "new subscription to channel '" << channel << "'" << std::endl;
+        std::cout << "new subscription to queue '" << queue << "'" << std::endl;
     }
 
-    void unsubscribe_( std::string const& channel, subscriber_session_ptr s) {
-        channels_iter iter = channels_.find( channel);
-        if ( channels_.end() != iter ) {
+    void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
+        queues_iter iter = queues_.find( queue);
+        if ( queues_.end() != iter ) {
             iter->second->unsubscribe( s);
         }
     }
 
-    void publish_( std::string const& channel, std::string const& msg) {
-        channels_iter iter = channels_.find( channel);
-        if ( channels_.end() == iter ) {
-            throw std::runtime_error("channel does not exist");
+    void publish_( std::string const& queue, std::string const& msg) {
+        queues_iter iter = queues_.find( queue);
+        if ( queues_.end() == iter ) {
+            throw std::runtime_error("queue does not exist");
         }
         iter->second->publish( msg);
-        std::cout << "message '" << msg << "' to publish on channel '" << channel << "'" << std::endl;
+        std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
     }
 
 public:
-    // add a channel to registry
-    void register_channel( std::string const& channel) {
+    // add a queue to registry
+    void register_queue( std::string const& queue) {
         std::unique_lock< boost::fibers::mutex > lk( mtx_);
-        register_channel_( channel);
+        register_queue_( queue);
     }
 
-    // remove a channel from registry
-    void unregister_channel( std::string const& channel) {
+    // remove a queue from registry
+    void unregister_queue( std::string const& queue) {
         std::unique_lock< boost::fibers::mutex > lk( mtx_);
-        unregister_channel_( channel);
+        unregister_queue_( queue);
     }
 
-    // subscribe to a channel
-    void subscribe( std::string const& channel, subscriber_session_ptr s) {
+    // subscribe to a queue
+    void subscribe( std::string const& queue, subscriber_session_ptr s) {
         std::unique_lock< boost::fibers::mutex > lk( mtx_);
-        subscribe_( channel, s);
+        subscribe_( queue, s);
     }
 
-    // unsubscribe from a channel
-    void unsubscribe( std::string const& channel, subscriber_session_ptr s) {
+    // unsubscribe from a queue
+    void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
         std::unique_lock< boost::fibers::mutex > lk( mtx_);
-        unsubscribe_( channel, s);
+        unsubscribe_( queue, s);
     }
 
-    // publish a message to all subscribers registerd to the channel
-    void publish( std::string const& channel, std::string const& msg) {
+    // publish a message to all subscribers registerd to the queue
+    void publish( std::string const& queue, std::string const& msg) {
         std::unique_lock< boost::fibers::mutex > lk( mtx_);
-        publish_( channel, msg);
+        publish_( queue, msg);
     }
 };
 
-// a subscriber subscribes to a given channel in order to receive messages published on this channel
-class subscriber_session : public boost::enable_shared_from_this< subscriber_session > {
+// a subscriber subscribes to a given queue in order to receive messages published on this queue
+class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
 public:
-    explicit subscriber_session( boost::asio::io_service & io_service, registry & reg) :
-        socket_( io_service),
+    explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
+        socket_( io_service),
         reg_( reg) {
     }
 
@@ -147,10 +142,10 @@ public:
 
     // this function is executed inside the fiber
     void run() {
-        std::string channel;
+        std::string queue;
         try {
             boost::system::error_code ec;
-            // read first message == channel name
+            // read first message == queue name
             // async_ready() returns if the the complete message is read
             // until this the fiber is suspended until the complete message
             // is read int the given buffer 'data'
@@ -159,13 +154,13 @@ public:
                     boost::asio::buffer( data_),
                     boost::fibers::asio::yield[ec]);
             if ( ec) {
-                throw std::runtime_error("no channel from subscriber");
+                throw std::runtime_error("no queue from subscriber");
             }
-            // first message ist equal to the channel name the publisher
+            // first message ist equal to the queue name the publisher
             // publishes to
-            channel = data_;
-            // subscribe to new channel
-            reg_.subscribe( channel, shared_from_this() );
+            queue = data_;
+            // subscribe to new queue
+            reg_.subscribe( queue, shared_from_this() );
             // read published messages
             for (;;) {
                 // wait for a conditon-variable for new messages
@@ -176,7 +171,6 @@ public:
                 cond_.wait( lk);
                 std::string data( data_);
                 lk.unlock();
-                std::cout << "subscriber::run(): '" << data << std::endl;
                 // message '<fini>' terminates subscription
                 if ( "<fini>" == data) {
                     break;
@@ -197,12 +191,12 @@ public:
                 std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
             }
         } catch ( std::exception const& e) {
-            std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl;
+            std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
         }
         // close socket
         socket_.close();
-        // unregister channel
-        reg_.unsubscribe( channel, shared_from_this() );
+        // unregister queue
+        reg_.unsubscribe( queue, shared_from_this() );
     }
 
     // called from publisher_session (running in other fiber)
@@ -224,24 +218,24 @@ private:
 
 
 subscriptions::~subscriptions() {
-    BOOST_FOREACH( subscriber_session_ptr s, subscribers_) {
+    for ( subscriber_session_ptr s : subscribers_) {
         s->publish("<fini>");
     } 
 }
 
 void
 subscriptions::publish( std::string const& msg) {
-    BOOST_FOREACH( subscriber_session_ptr s, subscribers_) {
+    for ( subscriber_session_ptr s : subscribers_) {
         s->publish( msg);
     }
 }
 
-// a publisher publishes messages on its channel
-// subscriber might register to this channel to get the published messages
-class publisher_session : public boost::enable_shared_from_this< publisher_session > {
+// a publisher publishes messages on its queue
+// subscriber might register to this queue to get the published messages
+class publisher_session : public std::enable_shared_from_this< publisher_session > {
 public:
-    explicit publisher_session( boost::asio::io_service & io_service, registry & reg) :
-        socket_( io_service),
+    explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
+        socket_( io_service),
         reg_( reg) {
     }
 
@@ -251,12 +245,12 @@ public:
 
     // this function is executed inside the fiber
     void run() {
-        std::string channel;
+        std::string queue;
         try {
             boost::system::error_code ec;
             // fixed size message
             char data[max_length];
-            // read first message == channel name
+            // read first message == queue name
             // async_ready() returns if the the complete message is read
             // until this the fiber is suspended until the complete message
             // is read int the given buffer 'data'
@@ -265,13 +259,13 @@ public:
                     boost::asio::buffer( data),
                     boost::fibers::asio::yield[ec]);
             if ( ec) {
-                throw std::runtime_error("no channel from publisher");
+                throw std::runtime_error("no queue from publisher");
             }
-            // first message ist equal to the channel name the publisher
+            // first message ist equal to the queue name the publisher
             // publishes to
-            channel = data;
-            // register the new channel
-            reg_.register_channel( channel);
+            queue = data;
+            // register the new queue
+            reg_.register_queue( queue);
             // start publishing messages
             for (;;) {
                 // read message from publisher asyncronous
@@ -287,15 +281,15 @@ public:
                     throw boost::system::system_error( ec); //some other error
                 }
                 // publish message to all subscribers
-                reg_.publish( channel, std::string( data) );
+                reg_.publish( queue, std::string( data) );
             }
         } catch ( std::exception const& e) {
-            std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl;
+            std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
         }
         // close socket
         socket_.close();
-        // unregister channel
-        reg_.unregister_channel( channel);
+        // unregister queue
+        reg_.unregister_queue( queue);
     }
 
 private:
@@ -303,21 +297,21 @@ private:
     registry        &   reg_;
 };
 
-typedef boost::shared_ptr< publisher_session > publisher_session_ptr;
+typedef std::shared_ptr< publisher_session > publisher_session_ptr;
 
 // function accepts connections requests from clients acting as a publisher
-void accept_publisher( boost::asio::io_service& io_service,
+void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
                        unsigned short port,
                        registry & reg) {
     // create TCP-acceptor
-    tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
+    tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
     // loop for accepting connection requests
     for (;;) {
         boost::system::error_code ec;
         // create new publisher-session
         // this instance will be associated with one publisher
         publisher_session_ptr new_publisher_session = 
-            boost::make_shared<publisher_session>( boost::ref( io_service), boost::ref( reg) );
+            std::make_shared< publisher_session >( io_service, std::ref( reg) );
         // async. accept of new connection request
         // this function will suspend this execution context (fiber) until a
         // connection was established, after returning from this function a new client (publisher)
@@ -328,24 +322,24 @@ void accept_publisher( boost::asio::io_service& io_service,
         if ( ! ec) {
             // run the new publisher in its own fiber (one fiber for one client)
             boost::fibers::fiber(
-                boost::bind( & publisher_session::run, new_publisher_session) ).detach();
+                std::bind( & publisher_session::run, new_publisher_session) ).detach();
         }
     }
 }
 
 // function accepts connections requests from clients acting as a subscriber
-void accept_subscriber( boost::asio::io_service& io_service,
+void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
                         unsigned short port,
                         registry & reg) {
     // create TCP-acceptor
-    tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
+    tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
     // loop for accepting connection requests
     for (;;) {
         boost::system::error_code ec;
         // create new subscriber-session
         // this instance will be associated with one subscriber
         subscriber_session_ptr new_subscriber_session = 
-            boost::make_shared<subscriber_session>( boost::ref( io_service), boost::ref( reg) );
+            std::make_shared< subscriber_session >( io_service, std::ref( reg) );
         // async. accept of new connection request
         // this function will suspend this execution context (fiber) until a
         // connection was established, after returning from this function a new client (subscriber)
@@ -356,7 +350,7 @@ void accept_subscriber( boost::asio::io_service& io_service,
         if ( ! ec) {
             // run the new subscriber in its own fiber (one fiber for one client)
             boost::fibers::fiber(
-                boost::bind( & subscriber_session::run, new_subscriber_session) ).detach();
+                std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
         }
     }
 }
@@ -365,19 +359,19 @@ void accept_subscriber( boost::asio::io_service& io_service,
 int main( int argc, char* argv[]) {
     try {
         // create io_service for async. I/O
-        boost::asio::io_service io_service;
+        std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
         // register asio scheduler
         boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
-        // registry for channels and its subscription
+        // registry for queues and its subscription
         registry reg;
         // create an acceptor for publishers, run it as fiber
         boost::fibers::fiber(
-            accept_publisher, boost::ref( io_service), 9997, boost::ref( reg) ).detach();
+            accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
         // create an acceptor for subscribers, run it as fiber
         boost::fibers::fiber(
-            accept_subscriber, boost::ref( io_service), 9998, boost::ref( reg) ).detach();
+            accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
         // dispatch
-        io_service.run();
+        io_service->run();
         return EXIT_SUCCESS;
     } catch ( std::exception const& e) {
         std::cerr << "Exception: " << e.what() << "\n";