]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/src/algo/work_stealing.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / src / algo / work_stealing.cpp
1
2 // Copyright Oliver Kowalke 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #include "boost/fiber/algo/work_stealing.hpp"
9
10 #include <random>
11
12 #include <boost/assert.hpp>
13 #include <boost/context/detail/prefetch.hpp>
14
15 #include "boost/fiber/type.hpp"
16
17 #ifdef BOOST_HAS_ABI_HEADERS
18 # include BOOST_ABI_PREFIX
19 #endif
20
21 namespace boost {
22 namespace fibers {
23 namespace algo {
24
25 std::atomic< std::uint32_t > work_stealing::counter_{ 0 };
26 std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{};
27
28 void
29 work_stealing::init_( std::uint32_t thread_count,
30 std::vector< intrusive_ptr< work_stealing > > & schedulers) {
31 // resize array of schedulers to thread_count, initilized with nullptr
32 std::vector< intrusive_ptr< work_stealing > >{ thread_count, nullptr }.swap( schedulers);
33 }
34
35 work_stealing::work_stealing( std::uint32_t thread_count, bool suspend) :
36 id_{ counter_++ },
37 thread_count_{ thread_count },
38 suspend_{ suspend } {
39 // initialize the array of schedulers
40 static std::once_flag flag;
41 std::call_once( flag, & work_stealing::init_, thread_count_, std::ref( schedulers_) );
42 // register pointer of this scheduler
43 schedulers_[id_] = this;
44 }
45
46 void
47 work_stealing::awakened( context * ctx) noexcept {
48 if ( ! ctx->is_context( type::pinned_context) ) {
49 ctx->detach();
50 }
51 rqueue_.push( ctx);
52 }
53
54 context *
55 work_stealing::pick_next() noexcept {
56 context * victim = rqueue_.pop();
57 if ( nullptr != victim) {
58 boost::context::detail::prefetch_range( victim, sizeof( context) );
59 if ( ! victim->is_context( type::pinned_context) ) {
60 context::active()->attach( victim);
61 }
62 } else {
63 std::uint32_t id = 0;
64 std::size_t count = 0, size = schedulers_.size();
65 static thread_local std::minstd_rand generator{ std::random_device{}() };
66 std::uniform_int_distribution< std::uint32_t > distribution{
67 0, static_cast< std::uint32_t >( thread_count_ - 1) };
68 do {
69 do {
70 ++count;
71 // random selection of one logical cpu
72 // that belongs to the local NUMA node
73 id = distribution( generator);
74 // prevent stealing from own scheduler
75 } while ( id == id_);
76 // steal context from other scheduler
77 victim = schedulers_[id]->steal();
78 } while ( nullptr == victim && count < size);
79 if ( nullptr != victim) {
80 boost::context::detail::prefetch_range( victim, sizeof( context) );
81 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
82 context::active()->attach( victim);
83 }
84 }
85 return victim;
86 }
87
88 void
89 work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
90 if ( suspend_) {
91 if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
92 std::unique_lock< std::mutex > lk{ mtx_ };
93 cnd_.wait( lk, [this](){ return flag_; });
94 flag_ = false;
95 } else {
96 std::unique_lock< std::mutex > lk{ mtx_ };
97 cnd_.wait_until( lk, time_point, [this](){ return flag_; });
98 flag_ = false;
99 }
100 }
101 }
102
103 void
104 work_stealing::notify() noexcept {
105 if ( suspend_) {
106 std::unique_lock< std::mutex > lk{ mtx_ };
107 flag_ = true;
108 lk.unlock();
109 cnd_.notify_all();
110 }
111 }
112
113 }}}
114
115 #ifdef BOOST_HAS_ABI_HEADERS
116 # include BOOST_ABI_SUFFIX
117 #endif