1 // (C) Copyright 2014 Vicente Botet
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 // adapted from the example given by Howard Hinnant in
8 #include <boost/config.hpp>
10 #define BOOST_THREAD_VERSION 4
11 //#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
12 #if ! defined BOOST_NO_CXX11_DECLTYPE
13 #define BOOST_RESULT_OF_USE_DECLTYPE
16 #include <boost/thread/scoped_thread.hpp>
18 #include <boost/thread/externally_locked_stream.hpp>
19 typedef boost::externally_locked_stream
<std::ostream
> the_ostream
;
21 typedef std::ostream the_ostream
;
22 typedef std::istream the_istream
;
24 #include <boost/thread/concurrent_queues/sync_queue.hpp>
25 #include <boost/thread/concurrent_queues/queue_adaptor.hpp>
26 #include <boost/thread/concurrent_queues/queue_views.hpp>
27 #include <boost/static_assert.hpp>
28 #include <boost/type_traits.hpp>
30 void producer(the_ostream
&/*mos*/, boost::queue_back
<int> sbq
)
32 using namespace boost
;
38 //mos << "push(" << i << ") " << sbq.size() <<"\n";
39 this_thread::sleep_for(chrono::milliseconds(200));
42 catch(sync_queue_is_closed
&)
44 //mos << "closed !!!\n";
48 //mos << "exception !!!\n";
54 boost::queue_front
<int> sbq
)
56 using namespace boost
;
63 //mos << i << " pull(" << r << ") " << sbq.size() <<"\n";
65 this_thread::sleep_for(chrono::milliseconds(250));
68 catch(sync_queue_is_closed
&)
70 //mos << "closed !!!\n";
74 //mos << "exception !!!\n";
77 void consumer2(the_ostream
&/*mos*/, boost::queue_front
<int> sbq
)
79 using namespace boost
;
84 queue_op_status st
= sbq
.try_pull(r
);
85 if (queue_op_status::closed
== st
) break;
86 if (queue_op_status::success
== st
) {
87 //mos << i << " try_pull(" << r << ")\n";
89 this_thread::sleep_for(chrono::milliseconds(250));
94 //mos << "exception !!!\n";
97 void consumer3(the_ostream
&/*mos*/, boost::queue_front
<int> sbq
)
99 using namespace boost
;
104 queue_op_status res
= sbq
.wait_pull(r
);
105 if (res
==queue_op_status::closed
) break;
106 //mos << i << " wait_pull(" << r << ")\n";
107 this_thread::sleep_for(chrono::milliseconds(250));
112 //mos << "exception !!!\n";
118 using namespace boost
;
121 recursive_mutex terminal_mutex
;
123 externally_locked_stream
<std::ostream
> mcerr(std::cerr
, terminal_mutex
);
124 externally_locked_stream
<std::ostream
> mcout(std::cout
, terminal_mutex
);
125 externally_locked_stream
<std::istream
> mcin(std::cin
, terminal_mutex
);
127 the_ostream
&mcerr
= std::cout
;
128 the_ostream
&mcout
= std::cerr
;
129 //the_istream &mcin = std::cin;
132 queue_adaptor
<sync_queue
<int> > sbq
;
135 mcout
<< "begin of main" << std::endl
;
136 scoped_thread
<> t11(boost::thread(producer
, boost::ref(mcerr
), concurrent::queue_back
<int>(sbq
)));
137 scoped_thread
<> t12(boost::thread(producer
, boost::ref(mcerr
), concurrent::queue_back
<int>(sbq
)));
138 scoped_thread
<> t2(boost::thread(consumer
, boost::ref(mcout
), concurrent::queue_front
<int>(sbq
)));
140 this_thread::sleep_for(chrono::seconds(1));
142 mcout
<< "closed()" << std::endl
;
144 mcout
<< "closed()" << std::endl
;
146 } // all threads joined here.
147 mcout
<< "end of main" << std::endl
;