]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/examples/asio/ps/server.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / asio / ps / server.cpp
1 // Copyright Oliver Kowalke 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
5
6 #include <cstddef>
7 #include <cstdlib>
8 #include <map>
9 #include <set>
10 #include <iostream>
11 #include <string>
12
13 #include <boost/asio.hpp>
14 #include <boost/bind.hpp>
15 #include <boost/enable_shared_from_this.hpp>
16 #include <boost/foreach.hpp>
17 #include <boost/make_shared.hpp>
18 #include <boost/ref.hpp>
19 #include <boost/shared_ptr.hpp>
20 #include <boost/utility.hpp>
21
22 #include <boost/fiber/all.hpp>
23 #include "../round_robin.hpp"
24 #include "../yield.hpp"
25
26 using boost::asio::ip::tcp;
27
28 const std::size_t max_length = 1024;
29
30 class subscriber_session;
31 typedef boost::shared_ptr<subscriber_session> subscriber_session_ptr;
32
33 // a channel has n subscribers (subscriptions)
34 // this class holds a list of subcribers for one channel
35 class subscriptions {
36 public:
37 ~subscriptions();
38
39 // subscribe to this channel
40 void subscribe( subscriber_session_ptr const& s) {
41 subscribers_.insert( s);
42 }
43
44 // unsubscribe from this channel
45 void unsubscribe( subscriber_session_ptr const& s) {
46 subscribers_.erase(s);
47 }
48
49 // publish a message, e.g. push this message to all subscribers
50 void publish( std::string const& msg);
51
52 private:
53 // list of subscribers
54 std::set< subscriber_session_ptr > subscribers_;
55 };
56
57 // a class to register channels and to subsribe clients to this channels
58 class registry : private boost::noncopyable {
59 private:
60 typedef std::map< std::string, boost::shared_ptr< subscriptions > > channels_cont;
61 typedef channels_cont::iterator channels_iter;
62
63 boost::fibers::mutex mtx_;
64 channels_cont channels_;
65
66 void register_channel_( std::string const& channel) {
67 if ( channels_.end() != channels_.find( channel) ) {
68 throw std::runtime_error("channel already exists");
69 }
70 channels_[channel] = boost::make_shared< subscriptions >();
71 std::cout << "new channel '" << channel << "' registered" << std::endl;
72 }
73
74 void unregister_channel_( std::string const& channel) {
75 channels_.erase( channel);
76 std::cout << "channel '" << channel << "' unregistered" << std::endl;
77 }
78
79 void subscribe_( std::string const& channel, subscriber_session_ptr s) {
80 channels_iter iter = channels_.find( channel);
81 if ( channels_.end() == iter ) {
82 throw std::runtime_error("channel does not exist");
83 }
84 iter->second->subscribe( s);
85 std::cout << "new subscription to channel '" << channel << "'" << std::endl;
86 }
87
88 void unsubscribe_( std::string const& channel, subscriber_session_ptr s) {
89 channels_iter iter = channels_.find( channel);
90 if ( channels_.end() != iter ) {
91 iter->second->unsubscribe( s);
92 }
93 }
94
95 void publish_( std::string const& channel, std::string const& msg) {
96 channels_iter iter = channels_.find( channel);
97 if ( channels_.end() == iter ) {
98 throw std::runtime_error("channel does not exist");
99 }
100 iter->second->publish( msg);
101 std::cout << "message '" << msg << "' to publish on channel '" << channel << "'" << std::endl;
102 }
103
104 public:
105 // add a channel to registry
106 void register_channel( std::string const& channel) {
107 std::unique_lock< boost::fibers::mutex > lk( mtx_);
108 register_channel_( channel);
109 }
110
111 // remove a channel from registry
112 void unregister_channel( std::string const& channel) {
113 std::unique_lock< boost::fibers::mutex > lk( mtx_);
114 unregister_channel_( channel);
115 }
116
117 // subscribe to a channel
118 void subscribe( std::string const& channel, subscriber_session_ptr s) {
119 std::unique_lock< boost::fibers::mutex > lk( mtx_);
120 subscribe_( channel, s);
121 }
122
123 // unsubscribe from a channel
124 void unsubscribe( std::string const& channel, subscriber_session_ptr s) {
125 std::unique_lock< boost::fibers::mutex > lk( mtx_);
126 unsubscribe_( channel, s);
127 }
128
129 // publish a message to all subscribers registerd to the channel
130 void publish( std::string const& channel, std::string const& msg) {
131 std::unique_lock< boost::fibers::mutex > lk( mtx_);
132 publish_( channel, msg);
133 }
134 };
135
136 // a subscriber subscribes to a given channel in order to receive messages published on this channel
137 class subscriber_session : public boost::enable_shared_from_this< subscriber_session > {
138 public:
139 explicit subscriber_session( boost::asio::io_service & io_service, registry & reg) :
140 socket_( io_service),
141 reg_( reg) {
142 }
143
144 tcp::socket& socket() {
145 return socket_;
146 }
147
148 // this function is executed inside the fiber
149 void run() {
150 std::string channel;
151 try {
152 boost::system::error_code ec;
153 // read first message == channel name
154 // async_ready() returns if the the complete message is read
155 // until this the fiber is suspended until the complete message
156 // is read int the given buffer 'data'
157 boost::asio::async_read(
158 socket_,
159 boost::asio::buffer( data_),
160 boost::fibers::asio::yield[ec]);
161 if ( ec) {
162 throw std::runtime_error("no channel from subscriber");
163 }
164 // first message ist equal to the channel name the publisher
165 // publishes to
166 channel = data_;
167 // subscribe to new channel
168 reg_.subscribe( channel, shared_from_this() );
169 // read published messages
170 for (;;) {
171 // wait for a conditon-variable for new messages
172 // the fiber will be suspended until the condtion
173 // becomes true and the fiber is resumed
174 // published message is stored in buffer 'data_'
175 std::unique_lock< boost::fibers::mutex > lk( mtx_);
176 cond_.wait( lk);
177 std::string data( data_);
178 lk.unlock();
179 std::cout << "subscriber::run(): '" << data << std::endl;
180 // message '<fini>' terminates subscription
181 if ( "<fini>" == data) {
182 break;
183 }
184 // async. write message to socket connected with
185 // subscriber
186 // async_write() returns if the complete message was writen
187 // the fiber is suspended in the meanwhile
188 boost::asio::async_write(
189 socket_,
190 boost::asio::buffer( data, data.size() ),
191 boost::fibers::asio::yield[ec]);
192 if ( ec == boost::asio::error::eof) {
193 break; //connection closed cleanly by peer
194 } else if ( ec) {
195 throw boost::system::system_error( ec); //some other error
196 }
197 std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
198 }
199 } catch ( std::exception const& e) {
200 std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl;
201 }
202 // close socket
203 socket_.close();
204 // unregister channel
205 reg_.unsubscribe( channel, shared_from_this() );
206 }
207
208 // called from publisher_session (running in other fiber)
209 void publish( std::string const& msg) {
210 std::unique_lock< boost::fibers::mutex > lk( mtx_);
211 std::memset( data_, '\0', sizeof( data_));
212 std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
213 cond_.notify_one();
214 }
215
216 private:
217 tcp::socket socket_;
218 registry & reg_;
219 boost::fibers::mutex mtx_;
220 boost::fibers::condition_variable cond_;
221 // fixed size message
222 char data_[max_length];
223 };
224
225
226 subscriptions::~subscriptions() {
227 BOOST_FOREACH( subscriber_session_ptr s, subscribers_) {
228 s->publish("<fini>");
229 }
230 }
231
232 void
233 subscriptions::publish( std::string const& msg) {
234 BOOST_FOREACH( subscriber_session_ptr s, subscribers_) {
235 s->publish( msg);
236 }
237 }
238
239 // a publisher publishes messages on its channel
240 // subscriber might register to this channel to get the published messages
241 class publisher_session : public boost::enable_shared_from_this< publisher_session > {
242 public:
243 explicit publisher_session( boost::asio::io_service & io_service, registry & reg) :
244 socket_( io_service),
245 reg_( reg) {
246 }
247
248 tcp::socket& socket() {
249 return socket_;
250 }
251
252 // this function is executed inside the fiber
253 void run() {
254 std::string channel;
255 try {
256 boost::system::error_code ec;
257 // fixed size message
258 char data[max_length];
259 // read first message == channel name
260 // async_ready() returns if the the complete message is read
261 // until this the fiber is suspended until the complete message
262 // is read int the given buffer 'data'
263 boost::asio::async_read(
264 socket_,
265 boost::asio::buffer( data),
266 boost::fibers::asio::yield[ec]);
267 if ( ec) {
268 throw std::runtime_error("no channel from publisher");
269 }
270 // first message ist equal to the channel name the publisher
271 // publishes to
272 channel = data;
273 // register the new channel
274 reg_.register_channel( channel);
275 // start publishing messages
276 for (;;) {
277 // read message from publisher asyncronous
278 // async_read() suspends this fiber until the complete emssage is read
279 // and stored in the given buffer 'data'
280 boost::asio::async_read(
281 socket_,
282 boost::asio::buffer( data),
283 boost::fibers::asio::yield[ec]);
284 if ( ec == boost::asio::error::eof) {
285 break; //connection closed cleanly by peer
286 } else if ( ec) {
287 throw boost::system::system_error( ec); //some other error
288 }
289 // publish message to all subscribers
290 reg_.publish( channel, std::string( data) );
291 }
292 } catch ( std::exception const& e) {
293 std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl;
294 }
295 // close socket
296 socket_.close();
297 // unregister channel
298 reg_.unregister_channel( channel);
299 }
300
301 private:
302 tcp::socket socket_;
303 registry & reg_;
304 };
305
306 typedef boost::shared_ptr< publisher_session > publisher_session_ptr;
307
308 // function accepts connections requests from clients acting as a publisher
309 void accept_publisher( boost::asio::io_service& io_service,
310 unsigned short port,
311 registry & reg) {
312 // create TCP-acceptor
313 tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
314 // loop for accepting connection requests
315 for (;;) {
316 boost::system::error_code ec;
317 // create new publisher-session
318 // this instance will be associated with one publisher
319 publisher_session_ptr new_publisher_session =
320 boost::make_shared<publisher_session>( boost::ref( io_service), boost::ref( reg) );
321 // async. accept of new connection request
322 // this function will suspend this execution context (fiber) until a
323 // connection was established, after returning from this function a new client (publisher)
324 // is connected
325 acceptor.async_accept(
326 new_publisher_session->socket(),
327 boost::fibers::asio::yield[ec]);
328 if ( ! ec) {
329 // run the new publisher in its own fiber (one fiber for one client)
330 boost::fibers::fiber(
331 boost::bind( & publisher_session::run, new_publisher_session) ).detach();
332 }
333 }
334 }
335
336 // function accepts connections requests from clients acting as a subscriber
337 void accept_subscriber( boost::asio::io_service& io_service,
338 unsigned short port,
339 registry & reg) {
340 // create TCP-acceptor
341 tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
342 // loop for accepting connection requests
343 for (;;) {
344 boost::system::error_code ec;
345 // create new subscriber-session
346 // this instance will be associated with one subscriber
347 subscriber_session_ptr new_subscriber_session =
348 boost::make_shared<subscriber_session>( boost::ref( io_service), boost::ref( reg) );
349 // async. accept of new connection request
350 // this function will suspend this execution context (fiber) until a
351 // connection was established, after returning from this function a new client (subscriber)
352 // is connected
353 acceptor.async_accept(
354 new_subscriber_session->socket(),
355 boost::fibers::asio::yield[ec]);
356 if ( ! ec) {
357 // run the new subscriber in its own fiber (one fiber for one client)
358 boost::fibers::fiber(
359 boost::bind( & subscriber_session::run, new_subscriber_session) ).detach();
360 }
361 }
362 }
363
364
365 int main( int argc, char* argv[]) {
366 try {
367 // create io_service for async. I/O
368 boost::asio::io_service io_service;
369 // register asio scheduler
370 boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
371 // registry for channels and its subscription
372 registry reg;
373 // create an acceptor for publishers, run it as fiber
374 boost::fibers::fiber(
375 accept_publisher, boost::ref( io_service), 9997, boost::ref( reg) ).detach();
376 // create an acceptor for subscribers, run it as fiber
377 boost::fibers::fiber(
378 accept_subscriber, boost::ref( io_service), 9998, boost::ref( reg) ).detach();
379 // dispatch
380 io_service.run();
381 return EXIT_SUCCESS;
382 } catch ( std::exception const& e) {
383 std::cerr << "Exception: " << e.what() << "\n";
384 }
385
386 return EXIT_FAILURE;
387 }