]>
git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/examples/asio/autoecho.cpp
1 // Copyright 2003-2013 Christopher M. Kohlhoff
2 // Copyright Oliver Kowalke, Nat Goodspeed 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
17 #include <boost/asio.hpp>
18 #include <boost/bind.hpp>
19 #include <boost/shared_ptr.hpp>
21 #include <boost/fiber/all.hpp>
22 #include "round_robin.hpp"
25 using boost::asio::ip::tcp
;
27 const int max_length
= 1024;
29 typedef boost::shared_ptr
< tcp::socket
> socket_ptr
;
31 const char* const alpha
= "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
33 /*****************************************************************************
35 *****************************************************************************/
38 std::map
<std::thread::id
, std::string
> names_
{};
39 const char* next_
{ alpha
};
43 ThreadNames() = default;
45 std::string
lookup() {
46 std::unique_lock
<std::mutex
> lk( mtx_
);
47 auto this_id( std::this_thread::get_id() );
48 auto found
= names_
.find( this_id
);
49 if ( found
!= names_
.end() ) {
52 BOOST_ASSERT( *next_
);
53 std::string
name(1, *next_
++ );
54 names_
[ this_id
] = name
;
59 ThreadNames thread_names
;
61 /*****************************************************************************
63 *****************************************************************************/
66 std::map
<boost::fibers::fiber::id
, std::string
> names_
{};
68 boost::fibers::mutex mtx_
{};
71 FiberNames() = default;
73 std::string
lookup() {
74 std::unique_lock
<boost::fibers::mutex
> lk( mtx_
);
75 auto this_id( boost::this_fiber::get_id() );
76 auto found
= names_
.find( this_id
);
77 if ( found
!= names_
.end() ) {
80 std::ostringstream out
;
81 // Bake into the fiber's name the thread name on which we first
82 // lookup() its ID, to be able to spot when a fiber hops between
84 out
<< thread_names
.lookup() << next_
++;
85 std::string
name( out
.str() );
86 names_
[ this_id
] = name
;
91 FiberNames fiber_names
;
94 std::ostringstream out
;
95 out
<< "Thread " << thread_names
.lookup() << ": "
96 << std::setw(4) << fiber_names
.lookup() << std::setw(0);
100 /*****************************************************************************
102 *****************************************************************************/
103 void print_( std::ostream
& out
) {
107 template < typename T
, typename
... Ts
>
108 void print_( std::ostream
& out
, T
const& arg
, Ts
const&... args
) {
110 print_(out
, args
...);
113 template < typename
... T
>
114 void print( T
const&... args
) {
115 std::ostringstream buffer
;
116 print_( buffer
, args
...);
117 std::cout
<< buffer
.str() << std::flush
;
120 /*****************************************************************************
121 * fiber function per server connection
122 *****************************************************************************/
123 void session( socket_ptr sock
) {
126 char data
[max_length
];
127 boost::system::error_code ec
;
128 std::size_t length
= sock
->async_read_some(
129 boost::asio::buffer( data
),
130 boost::fibers::asio::yield
[ec
]);
131 if ( ec
== boost::asio::error::eof
) {
132 break; //connection closed cleanly by peer
134 throw boost::system::system_error( ec
); //some other error
136 print( tag(), ": handled: ", std::string(data
, length
));
137 boost::asio::async_write(
139 boost::asio::buffer( data
, length
),
140 boost::fibers::asio::yield
[ec
]);
141 if ( ec
== boost::asio::error::eof
) {
142 break; //connection closed cleanly by peer
144 throw boost::system::system_error( ec
); //some other error
147 print( tag(), ": connection closed");
148 } catch ( std::exception
const& ex
) {
149 print( tag(), ": caught exception : ", ex
.what());
153 /*****************************************************************************
155 *****************************************************************************/
156 void server( std::shared_ptr
< boost::asio::io_service
> const& io_svc
, tcp::acceptor
& a
) {
157 print( tag(), ": echo-server started");
160 socket_ptr
socket( new tcp::socket( * io_svc
) );
161 boost::system::error_code ec
;
164 boost::fibers::asio::yield
[ec
]);
166 throw boost::system::system_error( ec
); //some other error
168 boost::fibers::fiber( session
, socket
).detach();
171 } catch ( std::exception
const& ex
) {
172 print( tag(), ": caught exception : ", ex
.what());
175 print( tag(), ": echo-server stopped");
178 /*****************************************************************************
179 * fiber function per client
180 *****************************************************************************/
181 void client( std::shared_ptr
< boost::asio::io_service
> const& io_svc
, tcp::acceptor
& a
,
182 boost::fibers::barrier
& barrier
, unsigned iterations
) {
183 print( tag(), ": echo-client started");
184 for (unsigned count
= 0; count
< iterations
; ++count
) {
185 tcp::resolver
resolver( * io_svc
);
186 tcp::resolver::query
query( tcp::v4(), "127.0.0.1", "9999");
187 tcp::resolver::iterator iterator
= resolver
.resolve( query
);
188 tcp::socket
s( * io_svc
);
189 boost::asio::connect( s
, iterator
);
190 for (unsigned msg
= 0; msg
< 1; ++msg
) {
191 std::ostringstream msgbuf
;
192 msgbuf
<< "from " << fiber_names
.lookup() << " " << count
<< "." << msg
;
193 std::string
message(msgbuf
.str());
194 print( tag(), ": Sending: ", message
);
195 boost::system::error_code ec
;
196 boost::asio::async_write(
198 boost::asio::buffer( message
),
199 boost::fibers::asio::yield
[ec
]);
200 if ( ec
== boost::asio::error::eof
) {
201 return; //connection closed cleanly by peer
203 throw boost::system::system_error( ec
); //some other error
205 char reply
[max_length
];
206 size_t reply_length
= s
.async_read_some(
207 boost::asio::buffer( reply
, max_length
),
208 boost::fibers::asio::yield
[ec
]);
209 if ( ec
== boost::asio::error::eof
) {
210 return; //connection closed cleanly by peer
212 throw boost::system::system_error( ec
); //some other error
214 print( tag(), ": Reply : ", std::string( reply
, reply_length
));
217 // done with all iterations, wait for rest of client fibers
218 if ( barrier
.wait()) {
219 // exactly one barrier.wait() call returns true
220 // we're the lucky one
222 print( tag(), ": acceptor stopped");
224 print( tag(), ": echo-client stopped");
227 /*****************************************************************************
229 *****************************************************************************/
230 int main( int argc
, char* argv
[]) {
233 std::shared_ptr
< boost::asio::io_service
> io_svc
= std::make_shared
< boost::asio::io_service
>();
234 boost::fibers::use_scheduling_algorithm
< boost::fibers::asio::round_robin
>( io_svc
);
236 print( "Thread ", thread_names
.lookup(), ": started");
237 //[asio_rr_launch_fibers
239 tcp::acceptor
a( * io_svc
, tcp::endpoint( tcp::v4(), 9999) );
240 boost::fibers::fiber( server
, io_svc
, std::ref( a
) ).detach();
242 const unsigned iterations
= 2;
243 const unsigned clients
= 3;
244 boost::fibers::barrier
b( clients
);
245 for ( unsigned i
= 0; i
< clients
; ++i
) {
246 boost::fibers::fiber(
247 client
, io_svc
, std::ref( a
), std::ref( b
), iterations
).detach();
253 print( tag(), ": io_service returned");
254 print( "Thread ", thread_names
.lookup(), ": stopping");
255 std::cout
<< "done." << std::endl
;
257 } catch ( std::exception
const& e
) {
258 print("Exception: ", e
.what(), "\n");