]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/src/algo/numa/work_stealing.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / src / algo / numa / work_stealing.cpp
1
2 // Copyright Oliver Kowalke 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #include "boost/fiber/algo/numa/work_stealing.hpp"
9
10 #include <cmath>
11 #include <random>
12
13 #include <boost/assert.hpp>
14 #include <boost/context/detail/prefetch.hpp>
15
16 #include "boost/fiber/type.hpp"
17
18 #ifdef BOOST_HAS_ABI_HEADERS
19 # include BOOST_ABI_PREFIX
20 #endif
21
22 namespace boost {
23 namespace fibers {
24 namespace algo {
25 namespace numa {
26
27 std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{};
28
29 std::vector< std::uint32_t > get_local_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) {
30 for ( auto & node : topo) {
31 if ( node_id == node.id) {
32 // store IDs of logical cpus that belong to this local NUMA node
33 return std::vector< std::uint32_t >{ node.logical_cpus.begin(), node.logical_cpus.end() };
34 }
35 }
36 return std::vector< std::uint32_t >{};
37 }
38
39 std::vector< std::uint32_t > get_remote_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) {
40 std::vector< std::uint32_t > remote_cpus;
41 for ( auto & node : topo) {
42 if ( node_id != node.id) {
43 // store IDs of logical cpus that belong to a remote NUMA node
44 // no ordering regarding to the NUMA distance
45 remote_cpus.insert( remote_cpus.end(), node.logical_cpus.begin(), node.logical_cpus.end() );
46 }
47 }
48 return remote_cpus;
49 }
50
51 void
52 work_stealing::init_( std::vector< boost::fibers::numa::node > const& topo,
53 std::vector< intrusive_ptr< work_stealing > > & schedulers) {
54 std::uint32_t max_cpu_id = 0;
55 for ( auto & node : topo) {
56 max_cpu_id = (std::max)( max_cpu_id, * node.logical_cpus.rbegin() );
57 }
58 // resize array of schedulers to max. CPU ID, initilized with nullptr
59 // CPU ID acts as the index in the scheduler array
60 // if a logical cpus is offline, schedulers_ will contain a nullptr
61 // logical cpus index starts at `0`
62 std::vector< intrusive_ptr< work_stealing > >{ max_cpu_id + 1, nullptr }.swap( schedulers);
63 }
64
65 work_stealing::work_stealing(
66 std::uint32_t cpu_id,
67 std::uint32_t node_id,
68 std::vector< boost::fibers::numa::node > const& topo,
69 bool suspend) :
70 cpu_id_{ cpu_id },
71 local_cpus_{ get_local_cpus( node_id, topo) },
72 remote_cpus_{ get_remote_cpus( node_id, topo) },
73 suspend_{ suspend } {
74 // pin current thread to logical cpu
75 boost::fibers::numa::pin_thread( cpu_id_);
76 // initialize the array of schedulers
77 static std::once_flag flag;
78 std::call_once( flag, & work_stealing::init_, topo, std::ref( schedulers_) );
79 // register pointer of this scheduler
80 schedulers_[cpu_id_] = this;
81 }
82
83 void
84 work_stealing::awakened( context * ctx) noexcept {
85 if ( ! ctx->is_context( type::pinned_context) ) {
86 ctx->detach();
87 }
88 rqueue_.push( ctx);
89 }
90
91 context *
92 work_stealing::pick_next() noexcept {
93 context * victim = rqueue_.pop();
94 if ( nullptr != victim) {
95 boost::context::detail::prefetch_range( victim, sizeof( context) );
96 if ( ! victim->is_context( type::pinned_context) ) {
97 context::active()->attach( victim);
98 }
99 } else {
100 std::uint32_t cpu_id = 0;
101 std::size_t count = 0, size = local_cpus_.size();
102 static thread_local std::minstd_rand generator{ std::random_device{}() };
103 std::uniform_int_distribution< std::uint32_t > local_distribution{
104 0, static_cast< std::uint32_t >( local_cpus_.size() - 1) };
105 std::uniform_int_distribution< std::uint32_t > remote_distribution{
106 0, static_cast< std::uint32_t >( remote_cpus_.size() - 1) };
107 do {
108 do {
109 ++count;
110 // random selection of one logical cpu
111 // that belongs to the local NUMA node
112 cpu_id = local_cpus_[local_distribution( generator)];
113 // prevent stealing from own scheduler
114 } while ( cpu_id == cpu_id_);
115 // steal context from other scheduler
116 // schedulers_[cpu_id] should never contain a nullptr
117 BOOST_ASSERT( nullptr != schedulers_[cpu_id]);
118 victim = schedulers_[cpu_id]->steal();
119 } while ( nullptr == victim && count < size);
120 if ( nullptr != victim) {
121 boost::context::detail::prefetch_range( victim, sizeof( context) );
122 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
123 context::active()->attach( victim);
124 } else if ( ! remote_cpus_.empty() ) {
125 cpu_id = 0;
126 count = 0;
127 size = remote_cpus_.size();
128 do {
129 ++count;
130 // random selection of one logical cpu
131 // that belongs to a remote NUMA node
132 cpu_id = remote_cpus_[remote_distribution( generator)];
133 // remote cpu ID should never be equal to local cpu ID
134 BOOST_ASSERT( cpu_id != cpu_id_);
135 // schedulers_[cpu_id] should never contain a nullptr
136 BOOST_ASSERT( nullptr != schedulers_[cpu_id]);
137 // steal context from other scheduler
138 victim = schedulers_[cpu_id]->steal();
139 } while ( nullptr == victim && count < size);
140 if ( nullptr != victim) {
141 boost::context::detail::prefetch_range( victim, sizeof( context) );
142 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
143 // move memory from remote NUMA-node to
144 // memory of local NUMA-node
145 context::active()->attach( victim);
146 }
147 }
148 }
149 return victim;
150 }
151
152 void
153 work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
154 if ( suspend_) {
155 if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
156 std::unique_lock< std::mutex > lk{ mtx_ };
157 cnd_.wait( lk, [this](){ return flag_; });
158 flag_ = false;
159 } else {
160 std::unique_lock< std::mutex > lk{ mtx_ };
161 cnd_.wait_until( lk, time_point, [this](){ return flag_; });
162 flag_ = false;
163 }
164 }
165 }
166
167 void
168 work_stealing::notify() noexcept {
169 if ( suspend_) {
170 std::unique_lock< std::mutex > lk{ mtx_ };
171 flag_ = true;
172 lk.unlock();
173 cnd_.notify_all();
174 }
175 }
176
177 }}}}
178
179 #ifdef BOOST_HAS_ABI_HEADERS
180 # include BOOST_ABI_SUFFIX
181 #endif