]>
Commit | Line | Data |
---|---|---|
abe05a73 XL |
1 | /* |
2 | * Copyright 2016 WebAssembly Community Group participants | |
3 | * | |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | * you may not use this file except in compliance with the License. | |
6 | * You may obtain a copy of the License at | |
7 | * | |
8 | * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | * | |
10 | * Unless required by applicable law or agreed to in writing, software | |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | * See the License for the specific language governing permissions and | |
14 | * limitations under the License. | |
15 | */ | |
16 | ||
17 | #include <assert.h> | |
18 | ||
19 | #include <algorithm> | |
20 | #include <iostream> | |
21 | #include <string> | |
22 | ||
23 | #include "threads.h" | |
24 | #include "compiler-support.h" | |
25 | #include "utilities.h" | |
26 | ||
27 | ||
28 | // debugging tools | |
29 | ||
30 | #ifdef BINARYEN_THREAD_DEBUG | |
31 | static std::mutex debug; | |
32 | #define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; } | |
33 | #define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; } | |
34 | #else | |
35 | #define DEBUG_THREAD(x) | |
36 | #define DEBUG_POOL(x) | |
37 | #endif | |
38 | ||
39 | ||
40 | namespace wasm { | |
41 | ||
42 | // Global thread information | |
43 | ||
44 | static std::unique_ptr<ThreadPool> pool; | |
45 | ||
46 | ||
47 | // Thread | |
48 | ||
49 | Thread::Thread() { | |
50 | assert(!ThreadPool::get()->isRunning()); | |
51 | thread = make_unique<std::thread>(mainLoop, this); | |
52 | } | |
53 | ||
54 | Thread::~Thread() { | |
55 | assert(!ThreadPool::get()->isRunning()); | |
56 | { | |
57 | std::lock_guard<std::mutex> lock(mutex); | |
58 | // notify the thread that it can exit | |
59 | done = true; | |
60 | condition.notify_one(); | |
61 | } | |
62 | thread->join(); | |
63 | } | |
64 | ||
65 | void Thread::work(std::function<ThreadWorkState ()> doWork_) { | |
66 | // TODO: fancy work stealing | |
67 | DEBUG_THREAD("send work to thread\n"); | |
68 | { | |
69 | std::lock_guard<std::mutex> lock(mutex); | |
70 | // notify the thread that it can do some work | |
71 | doWork = doWork_; | |
72 | condition.notify_one(); | |
73 | DEBUG_THREAD("work sent\n"); | |
74 | } | |
75 | } | |
76 | ||
77 | void Thread::mainLoop(void *self_) { | |
78 | auto* self = static_cast<Thread*>(self_); | |
79 | while (1) { | |
80 | DEBUG_THREAD("checking for work\n"); | |
81 | { | |
82 | std::unique_lock<std::mutex> lock(self->mutex); | |
83 | if (self->doWork) { | |
84 | DEBUG_THREAD("doing work\n"); | |
85 | // run tasks until they are all done | |
86 | while (self->doWork() == ThreadWorkState::More) {} | |
87 | self->doWork = nullptr; | |
88 | } else if (self->done) { | |
89 | DEBUG_THREAD("done\n"); | |
90 | return; | |
91 | } | |
92 | } | |
93 | ThreadPool::get()->notifyThreadIsReady(); | |
94 | { | |
95 | std::unique_lock<std::mutex> lock(self->mutex); | |
96 | if (!self->done && !self->doWork) { | |
97 | DEBUG_THREAD("thread waiting\n"); | |
98 | self->condition.wait(lock); | |
99 | } | |
100 | } | |
101 | } | |
102 | } | |
103 | ||
104 | ||
105 | // ThreadPool | |
106 | ||
107 | void ThreadPool::initialize(size_t num) { | |
108 | if (num == 1) return; // no multiple cores, don't create threads | |
109 | DEBUG_POOL("initialize()\n"); | |
110 | std::unique_lock<std::mutex> lock(mutex); | |
111 | ready.store(threads.size()); // initial state before first resetThreadsAreReady() | |
112 | resetThreadsAreReady(); | |
113 | for (size_t i = 0; i < num; i++) { | |
114 | try { | |
115 | threads.emplace_back(make_unique<Thread>()); | |
116 | } catch (std::system_error&) { | |
117 | // failed to create a thread - don't use multithreading, as if num cores == 1 | |
118 | DEBUG_POOL("could not create thread\n"); | |
119 | threads.clear(); | |
120 | return; | |
121 | } | |
122 | } | |
123 | DEBUG_POOL("initialize() waiting\n"); | |
124 | condition.wait(lock, [this]() { return areThreadsReady(); }); | |
125 | DEBUG_POOL("initialize() is done\n"); | |
126 | } | |
127 | ||
128 | size_t ThreadPool::getNumCores() { | |
129 | #if EMSCRIPTEN | |
130 | return 1; | |
131 | #else | |
132 | size_t num = std::max(1U, std::thread::hardware_concurrency()); | |
133 | if (getenv("BINARYEN_CORES")) { | |
134 | num = std::stoi(getenv("BINARYEN_CORES")); | |
135 | } | |
136 | return num; | |
137 | #endif | |
138 | } | |
139 | ||
140 | ThreadPool* ThreadPool::get() { | |
141 | if (!pool) { | |
142 | pool = make_unique<ThreadPool>(); | |
143 | pool->initialize(getNumCores()); | |
144 | } | |
145 | return pool.get(); | |
146 | } | |
147 | ||
148 | void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) { | |
149 | size_t num = threads.size(); | |
150 | // If no multiple cores, or on a side thread, do not use worker threads | |
151 | if (num == 0) { | |
152 | // just run sequentially | |
153 | DEBUG_POOL("work() sequentially\n"); | |
154 | assert(doWorkers.size() > 0); | |
155 | while (doWorkers[0]() == ThreadWorkState::More) {} | |
156 | return; | |
157 | } | |
158 | // run in parallel on threads | |
159 | // TODO: fancy work stealing | |
160 | DEBUG_POOL("work() on threads\n"); | |
161 | assert(doWorkers.size() == num); | |
162 | assert(!running); | |
163 | running = true; | |
164 | std::unique_lock<std::mutex> lock(mutex); | |
165 | resetThreadsAreReady(); | |
166 | for (size_t i = 0; i < num; i++) { | |
167 | threads[i]->work(doWorkers[i]); | |
168 | } | |
169 | DEBUG_POOL("main thread waiting\n"); | |
170 | condition.wait(lock, [this]() { return areThreadsReady(); }); | |
171 | DEBUG_POOL("main thread waiting\n"); | |
172 | running = false; | |
173 | DEBUG_POOL("work() is done\n"); | |
174 | } | |
175 | ||
176 | size_t ThreadPool::size() { | |
177 | return std::max(size_t(1), threads.size()); | |
178 | } | |
179 | ||
180 | bool ThreadPool::isRunning() { | |
181 | return pool && pool->running; | |
182 | } | |
183 | ||
184 | void ThreadPool::notifyThreadIsReady() { | |
185 | DEBUG_POOL("notify thread is ready\n";) | |
186 | std::lock_guard<std::mutex> lock(mutex); | |
187 | ready.fetch_add(1); | |
188 | condition.notify_one(); | |
189 | } | |
190 | ||
191 | void ThreadPool::resetThreadsAreReady() { | |
192 | DEBUG_POOL("reset threads are ready\n";) | |
193 | auto old = ready.exchange(0); | |
194 | WASM_UNUSED(old); | |
195 | assert(old == threads.size()); | |
196 | } | |
197 | ||
198 | bool ThreadPool::areThreadsReady() { | |
199 | DEBUG_POOL("are threads ready?\n";) | |
200 | return ready.load() == threads.size(); | |
201 | } | |
202 | ||
203 | } // namespace wasm | |
204 |