]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/examples/asio/ps/server.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / asio / ps / server.cpp
CommitLineData
7c673cae
FG
1// Copyright Oliver Kowalke 2015.
2// Distributed under the Boost Software License, Version 1.0.
3// (See accompanying file LICENSE_1_0.txt or copy at
4// http://www.boost.org/LICENSE_1_0.txt)
5
6#include <cstddef>
7#include <cstdlib>
8#include <map>
b32b8144 9#include <memory>
7c673cae
FG
10#include <set>
11#include <iostream>
12#include <string>
13
14#include <boost/asio.hpp>
7c673cae
FG
15#include <boost/utility.hpp>
16
17#include <boost/fiber/all.hpp>
18#include "../round_robin.hpp"
19#include "../yield.hpp"
20
21using boost::asio::ip::tcp;
22
23const std::size_t max_length = 1024;
24
25class subscriber_session;
b32b8144 26typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
7c673cae 27
b32b8144
FG
28// a queue has n subscribers (subscriptions)
29// this class holds a list of subcribers for one queue
7c673cae
FG
30class subscriptions {
31public:
32 ~subscriptions();
33
b32b8144 34 // subscribe to this queue
7c673cae
FG
35 void subscribe( subscriber_session_ptr const& s) {
36 subscribers_.insert( s);
37 }
38
b32b8144 39 // unsubscribe from this queue
7c673cae
FG
40 void unsubscribe( subscriber_session_ptr const& s) {
41 subscribers_.erase(s);
42 }
43
44 // publish a message, e.g. push this message to all subscribers
45 void publish( std::string const& msg);
46
47private:
48 // list of subscribers
49 std::set< subscriber_session_ptr > subscribers_;
50};
51
b32b8144 52// a class to register queues and to subsribe clients to this queues
7c673cae
FG
53class registry : private boost::noncopyable {
54private:
b32b8144
FG
55 typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
56 typedef queues_cont::iterator queues_iter;
7c673cae
FG
57
58 boost::fibers::mutex mtx_;
b32b8144 59 queues_cont queues_;
7c673cae 60
b32b8144
FG
61 void register_queue_( std::string const& queue) {
62 if ( queues_.end() != queues_.find( queue) ) {
63 throw std::runtime_error("queue already exists");
7c673cae 64 }
b32b8144
FG
65 queues_[queue] = std::make_shared< subscriptions >();
66 std::cout << "new queue '" << queue << "' registered" << std::endl;
7c673cae
FG
67 }
68
b32b8144
FG
69 void unregister_queue_( std::string const& queue) {
70 queues_.erase( queue);
71 std::cout << "queue '" << queue << "' unregistered" << std::endl;
7c673cae
FG
72 }
73
b32b8144
FG
74 void subscribe_( std::string const& queue, subscriber_session_ptr s) {
75 queues_iter iter = queues_.find( queue);
76 if ( queues_.end() == iter ) {
77 throw std::runtime_error("queue does not exist");
7c673cae
FG
78 }
79 iter->second->subscribe( s);
b32b8144 80 std::cout << "new subscription to queue '" << queue << "'" << std::endl;
7c673cae
FG
81 }
82
b32b8144
FG
83 void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
84 queues_iter iter = queues_.find( queue);
85 if ( queues_.end() != iter ) {
7c673cae
FG
86 iter->second->unsubscribe( s);
87 }
88 }
89
b32b8144
FG
90 void publish_( std::string const& queue, std::string const& msg) {
91 queues_iter iter = queues_.find( queue);
92 if ( queues_.end() == iter ) {
93 throw std::runtime_error("queue does not exist");
7c673cae
FG
94 }
95 iter->second->publish( msg);
b32b8144 96 std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
7c673cae
FG
97 }
98
99public:
b32b8144
FG
100 // add a queue to registry
101 void register_queue( std::string const& queue) {
7c673cae 102 std::unique_lock< boost::fibers::mutex > lk( mtx_);
b32b8144 103 register_queue_( queue);
7c673cae
FG
104 }
105
b32b8144
FG
106 // remove a queue from registry
107 void unregister_queue( std::string const& queue) {
7c673cae 108 std::unique_lock< boost::fibers::mutex > lk( mtx_);
b32b8144 109 unregister_queue_( queue);
7c673cae
FG
110 }
111
b32b8144
FG
112 // subscribe to a queue
113 void subscribe( std::string const& queue, subscriber_session_ptr s) {
7c673cae 114 std::unique_lock< boost::fibers::mutex > lk( mtx_);
b32b8144 115 subscribe_( queue, s);
7c673cae
FG
116 }
117
b32b8144
FG
118 // unsubscribe from a queue
119 void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
7c673cae 120 std::unique_lock< boost::fibers::mutex > lk( mtx_);
b32b8144 121 unsubscribe_( queue, s);
7c673cae
FG
122 }
123
b32b8144
FG
124 // publish a message to all subscribers registerd to the queue
125 void publish( std::string const& queue, std::string const& msg) {
7c673cae 126 std::unique_lock< boost::fibers::mutex > lk( mtx_);
b32b8144 127 publish_( queue, msg);
7c673cae
FG
128 }
129};
130
b32b8144
FG
131// a subscriber subscribes to a given queue in order to receive messages published on this queue
132class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
7c673cae 133public:
b32b8144
FG
134 explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
135 socket_( * io_service),
7c673cae
FG
136 reg_( reg) {
137 }
138
139 tcp::socket& socket() {
140 return socket_;
141 }
142
143 // this function is executed inside the fiber
144 void run() {
b32b8144 145 std::string queue;
7c673cae
FG
146 try {
147 boost::system::error_code ec;
b32b8144 148 // read first message == queue name
7c673cae
FG
149 // async_ready() returns if the the complete message is read
150 // until this the fiber is suspended until the complete message
151 // is read int the given buffer 'data'
152 boost::asio::async_read(
153 socket_,
154 boost::asio::buffer( data_),
155 boost::fibers::asio::yield[ec]);
156 if ( ec) {
b32b8144 157 throw std::runtime_error("no queue from subscriber");
7c673cae 158 }
b32b8144 159 // first message ist equal to the queue name the publisher
7c673cae 160 // publishes to
b32b8144
FG
161 queue = data_;
162 // subscribe to new queue
163 reg_.subscribe( queue, shared_from_this() );
7c673cae
FG
164 // read published messages
165 for (;;) {
166 // wait for a conditon-variable for new messages
167 // the fiber will be suspended until the condtion
168 // becomes true and the fiber is resumed
169 // published message is stored in buffer 'data_'
170 std::unique_lock< boost::fibers::mutex > lk( mtx_);
171 cond_.wait( lk);
172 std::string data( data_);
173 lk.unlock();
7c673cae
FG
174 // message '<fini>' terminates subscription
175 if ( "<fini>" == data) {
176 break;
177 }
178 // async. write message to socket connected with
179 // subscriber
180 // async_write() returns if the complete message was writen
181 // the fiber is suspended in the meanwhile
182 boost::asio::async_write(
183 socket_,
184 boost::asio::buffer( data, data.size() ),
185 boost::fibers::asio::yield[ec]);
186 if ( ec == boost::asio::error::eof) {
187 break; //connection closed cleanly by peer
188 } else if ( ec) {
189 throw boost::system::system_error( ec); //some other error
190 }
191 std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
192 }
193 } catch ( std::exception const& e) {
b32b8144 194 std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
7c673cae
FG
195 }
196 // close socket
197 socket_.close();
b32b8144
FG
198 // unregister queue
199 reg_.unsubscribe( queue, shared_from_this() );
7c673cae
FG
200 }
201
202 // called from publisher_session (running in other fiber)
203 void publish( std::string const& msg) {
204 std::unique_lock< boost::fibers::mutex > lk( mtx_);
205 std::memset( data_, '\0', sizeof( data_));
206 std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
207 cond_.notify_one();
208 }
209
210private:
211 tcp::socket socket_;
212 registry & reg_;
213 boost::fibers::mutex mtx_;
214 boost::fibers::condition_variable cond_;
215 // fixed size message
216 char data_[max_length];
217};
218
219
220subscriptions::~subscriptions() {
b32b8144 221 for ( subscriber_session_ptr s : subscribers_) {
7c673cae
FG
222 s->publish("<fini>");
223 }
224}
225
226void
227subscriptions::publish( std::string const& msg) {
b32b8144 228 for ( subscriber_session_ptr s : subscribers_) {
7c673cae
FG
229 s->publish( msg);
230 }
231}
232
b32b8144
FG
233// a publisher publishes messages on its queue
234// subscriber might register to this queue to get the published messages
235class publisher_session : public std::enable_shared_from_this< publisher_session > {
7c673cae 236public:
b32b8144
FG
237 explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
238 socket_( * io_service),
7c673cae
FG
239 reg_( reg) {
240 }
241
242 tcp::socket& socket() {
243 return socket_;
244 }
245
246 // this function is executed inside the fiber
247 void run() {
b32b8144 248 std::string queue;
7c673cae
FG
249 try {
250 boost::system::error_code ec;
251 // fixed size message
252 char data[max_length];
b32b8144 253 // read first message == queue name
7c673cae
FG
254 // async_ready() returns if the the complete message is read
255 // until this the fiber is suspended until the complete message
256 // is read int the given buffer 'data'
257 boost::asio::async_read(
258 socket_,
259 boost::asio::buffer( data),
260 boost::fibers::asio::yield[ec]);
261 if ( ec) {
b32b8144 262 throw std::runtime_error("no queue from publisher");
7c673cae 263 }
b32b8144 264 // first message ist equal to the queue name the publisher
7c673cae 265 // publishes to
b32b8144
FG
266 queue = data;
267 // register the new queue
268 reg_.register_queue( queue);
7c673cae
FG
269 // start publishing messages
270 for (;;) {
271 // read message from publisher asyncronous
272 // async_read() suspends this fiber until the complete emssage is read
273 // and stored in the given buffer 'data'
274 boost::asio::async_read(
275 socket_,
276 boost::asio::buffer( data),
277 boost::fibers::asio::yield[ec]);
278 if ( ec == boost::asio::error::eof) {
279 break; //connection closed cleanly by peer
280 } else if ( ec) {
281 throw boost::system::system_error( ec); //some other error
282 }
283 // publish message to all subscribers
b32b8144 284 reg_.publish( queue, std::string( data) );
7c673cae
FG
285 }
286 } catch ( std::exception const& e) {
b32b8144 287 std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
7c673cae
FG
288 }
289 // close socket
290 socket_.close();
b32b8144
FG
291 // unregister queue
292 reg_.unregister_queue( queue);
7c673cae
FG
293 }
294
295private:
296 tcp::socket socket_;
297 registry & reg_;
298};
299
b32b8144 300typedef std::shared_ptr< publisher_session > publisher_session_ptr;
7c673cae
FG
301
302// function accepts connections requests from clients acting as a publisher
b32b8144 303void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
7c673cae
FG
304 unsigned short port,
305 registry & reg) {
306 // create TCP-acceptor
b32b8144 307 tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
7c673cae
FG
308 // loop for accepting connection requests
309 for (;;) {
310 boost::system::error_code ec;
311 // create new publisher-session
312 // this instance will be associated with one publisher
313 publisher_session_ptr new_publisher_session =
b32b8144 314 std::make_shared< publisher_session >( io_service, std::ref( reg) );
7c673cae
FG
315 // async. accept of new connection request
316 // this function will suspend this execution context (fiber) until a
317 // connection was established, after returning from this function a new client (publisher)
318 // is connected
319 acceptor.async_accept(
320 new_publisher_session->socket(),
321 boost::fibers::asio::yield[ec]);
322 if ( ! ec) {
323 // run the new publisher in its own fiber (one fiber for one client)
324 boost::fibers::fiber(
b32b8144 325 std::bind( & publisher_session::run, new_publisher_session) ).detach();
7c673cae
FG
326 }
327 }
328}
329
330// function accepts connections requests from clients acting as a subscriber
b32b8144 331void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
7c673cae
FG
332 unsigned short port,
333 registry & reg) {
334 // create TCP-acceptor
b32b8144 335 tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
7c673cae
FG
336 // loop for accepting connection requests
337 for (;;) {
338 boost::system::error_code ec;
339 // create new subscriber-session
340 // this instance will be associated with one subscriber
341 subscriber_session_ptr new_subscriber_session =
b32b8144 342 std::make_shared< subscriber_session >( io_service, std::ref( reg) );
7c673cae
FG
343 // async. accept of new connection request
344 // this function will suspend this execution context (fiber) until a
345 // connection was established, after returning from this function a new client (subscriber)
346 // is connected
347 acceptor.async_accept(
348 new_subscriber_session->socket(),
349 boost::fibers::asio::yield[ec]);
350 if ( ! ec) {
351 // run the new subscriber in its own fiber (one fiber for one client)
352 boost::fibers::fiber(
b32b8144 353 std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
7c673cae
FG
354 }
355 }
356}
357
358
359int main( int argc, char* argv[]) {
360 try {
361 // create io_service for async. I/O
b32b8144 362 std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
7c673cae
FG
363 // register asio scheduler
364 boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
b32b8144 365 // registry for queues and its subscription
7c673cae
FG
366 registry reg;
367 // create an acceptor for publishers, run it as fiber
368 boost::fibers::fiber(
b32b8144 369 accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
7c673cae
FG
370 // create an acceptor for subscribers, run it as fiber
371 boost::fibers::fiber(
b32b8144 372 accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
7c673cae 373 // dispatch
b32b8144 374 io_service->run();
7c673cae
FG
375 return EXIT_SUCCESS;
376 } catch ( std::exception const& e) {
377 std::cerr << "Exception: " << e.what() << "\n";
378 }
379
380 return EXIT_FAILURE;
381}