]> git.proxmox.com Git - rustc.git/blame - src/binaryen/src/support/threads.cpp
New upstream version 1.25.0+dfsg1
[rustc.git] / src / binaryen / src / support / threads.cpp
CommitLineData
abe05a73
XL
1/*
2 * Copyright 2016 WebAssembly Community Group participants
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <assert.h>
18
19#include <algorithm>
20#include <iostream>
21#include <string>
22
23#include "threads.h"
24#include "compiler-support.h"
25#include "utilities.h"
26
27
28// debugging tools
29
30#ifdef BINARYEN_THREAD_DEBUG
31static std::mutex debug;
32#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; }
33#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; }
34#else
35#define DEBUG_THREAD(x)
36#define DEBUG_POOL(x)
37#endif
38
39
40namespace wasm {
41
42// Global thread information
43
44static std::unique_ptr<ThreadPool> pool;
45
46
47// Thread
48
49Thread::Thread() {
50 assert(!ThreadPool::get()->isRunning());
51 thread = make_unique<std::thread>(mainLoop, this);
52}
53
54Thread::~Thread() {
55 assert(!ThreadPool::get()->isRunning());
56 {
57 std::lock_guard<std::mutex> lock(mutex);
58 // notify the thread that it can exit
59 done = true;
60 condition.notify_one();
61 }
62 thread->join();
63}
64
65void Thread::work(std::function<ThreadWorkState ()> doWork_) {
66 // TODO: fancy work stealing
67 DEBUG_THREAD("send work to thread\n");
68 {
69 std::lock_guard<std::mutex> lock(mutex);
70 // notify the thread that it can do some work
71 doWork = doWork_;
72 condition.notify_one();
73 DEBUG_THREAD("work sent\n");
74 }
75}
76
77void Thread::mainLoop(void *self_) {
78 auto* self = static_cast<Thread*>(self_);
79 while (1) {
80 DEBUG_THREAD("checking for work\n");
81 {
82 std::unique_lock<std::mutex> lock(self->mutex);
83 if (self->doWork) {
84 DEBUG_THREAD("doing work\n");
85 // run tasks until they are all done
86 while (self->doWork() == ThreadWorkState::More) {}
87 self->doWork = nullptr;
88 } else if (self->done) {
89 DEBUG_THREAD("done\n");
90 return;
91 }
92 }
93 ThreadPool::get()->notifyThreadIsReady();
94 {
95 std::unique_lock<std::mutex> lock(self->mutex);
96 if (!self->done && !self->doWork) {
97 DEBUG_THREAD("thread waiting\n");
98 self->condition.wait(lock);
99 }
100 }
101 }
102}
103
104
105// ThreadPool
106
107void ThreadPool::initialize(size_t num) {
108 if (num == 1) return; // no multiple cores, don't create threads
109 DEBUG_POOL("initialize()\n");
110 std::unique_lock<std::mutex> lock(mutex);
111 ready.store(threads.size()); // initial state before first resetThreadsAreReady()
112 resetThreadsAreReady();
113 for (size_t i = 0; i < num; i++) {
114 try {
115 threads.emplace_back(make_unique<Thread>());
116 } catch (std::system_error&) {
117 // failed to create a thread - don't use multithreading, as if num cores == 1
118 DEBUG_POOL("could not create thread\n");
119 threads.clear();
120 return;
121 }
122 }
123 DEBUG_POOL("initialize() waiting\n");
124 condition.wait(lock, [this]() { return areThreadsReady(); });
125 DEBUG_POOL("initialize() is done\n");
126}
127
128size_t ThreadPool::getNumCores() {
129#if EMSCRIPTEN
130 return 1;
131#else
132 size_t num = std::max(1U, std::thread::hardware_concurrency());
133 if (getenv("BINARYEN_CORES")) {
134 num = std::stoi(getenv("BINARYEN_CORES"));
135 }
136 return num;
137#endif
138}
139
140ThreadPool* ThreadPool::get() {
141 if (!pool) {
142 pool = make_unique<ThreadPool>();
143 pool->initialize(getNumCores());
144 }
145 return pool.get();
146}
147
148void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) {
149 size_t num = threads.size();
150 // If no multiple cores, or on a side thread, do not use worker threads
151 if (num == 0) {
152 // just run sequentially
153 DEBUG_POOL("work() sequentially\n");
154 assert(doWorkers.size() > 0);
155 while (doWorkers[0]() == ThreadWorkState::More) {}
156 return;
157 }
158 // run in parallel on threads
159 // TODO: fancy work stealing
160 DEBUG_POOL("work() on threads\n");
161 assert(doWorkers.size() == num);
162 assert(!running);
163 running = true;
164 std::unique_lock<std::mutex> lock(mutex);
165 resetThreadsAreReady();
166 for (size_t i = 0; i < num; i++) {
167 threads[i]->work(doWorkers[i]);
168 }
169 DEBUG_POOL("main thread waiting\n");
170 condition.wait(lock, [this]() { return areThreadsReady(); });
171 DEBUG_POOL("main thread waiting\n");
172 running = false;
173 DEBUG_POOL("work() is done\n");
174}
175
176size_t ThreadPool::size() {
177 return std::max(size_t(1), threads.size());
178}
179
180bool ThreadPool::isRunning() {
181 return pool && pool->running;
182}
183
184void ThreadPool::notifyThreadIsReady() {
185 DEBUG_POOL("notify thread is ready\n";)
186 std::lock_guard<std::mutex> lock(mutex);
187 ready.fetch_add(1);
188 condition.notify_one();
189}
190
191void ThreadPool::resetThreadsAreReady() {
192 DEBUG_POOL("reset threads are ready\n";)
193 auto old = ready.exchange(0);
194 WASM_UNUSED(old);
195 assert(old == threads.size());
196}
197
198bool ThreadPool::areThreadsReady() {
199 DEBUG_POOL("are threads ready?\n";)
200 return ready.load() == threads.size();
201}
202
203} // namespace wasm
204