]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
f67539c2 | 6 | #include "test_util/sync_point_impl.h" |
11fdf7f2 TL |
7 | |
8 | #ifndef NDEBUG | |
f67539c2 | 9 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
10 | |
11 | void TestKillRandom(std::string kill_point, int odds, | |
12 | const std::string& srcfile, int srcline) { | |
13 | for (auto& p : rocksdb_kill_prefix_blacklist) { | |
14 | if (kill_point.substr(0, p.length()) == p) { | |
15 | return; | |
16 | } | |
17 | } | |
18 | ||
19 | assert(odds > 0); | |
20 | if (odds % 7 == 0) { | |
21 | // class Random uses multiplier 16807, which is 7^5. If odds are | |
22 | // multiplier of 7, there might be limited values generated. | |
23 | odds++; | |
24 | } | |
25 | auto* r = Random::GetTLSInstance(); | |
26 | bool crash = r->OneIn(odds); | |
27 | if (crash) { | |
28 | port::Crash(srcfile, srcline); | |
29 | } | |
30 | } | |
31 | ||
32 | ||
33 | void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) { | |
34 | std::lock_guard<std::mutex> lock(mutex_); | |
35 | successors_.clear(); | |
36 | predecessors_.clear(); | |
37 | cleared_points_.clear(); | |
38 | for (const auto& dependency : dependencies) { | |
39 | successors_[dependency.predecessor].push_back(dependency.successor); | |
40 | predecessors_[dependency.successor].push_back(dependency.predecessor); | |
41 | } | |
42 | cv_.notify_all(); | |
43 | } | |
44 | ||
45 | void SyncPoint::Data::LoadDependencyAndMarkers( | |
46 | const std::vector<SyncPointPair>& dependencies, | |
47 | const std::vector<SyncPointPair>& markers) { | |
48 | std::lock_guard<std::mutex> lock(mutex_); | |
49 | successors_.clear(); | |
50 | predecessors_.clear(); | |
51 | cleared_points_.clear(); | |
52 | markers_.clear(); | |
53 | marked_thread_id_.clear(); | |
54 | for (const auto& dependency : dependencies) { | |
55 | successors_[dependency.predecessor].push_back(dependency.successor); | |
56 | predecessors_[dependency.successor].push_back(dependency.predecessor); | |
57 | } | |
58 | for (const auto& marker : markers) { | |
59 | successors_[marker.predecessor].push_back(marker.successor); | |
60 | predecessors_[marker.successor].push_back(marker.predecessor); | |
61 | markers_[marker.predecessor].push_back(marker.successor); | |
62 | } | |
63 | cv_.notify_all(); | |
64 | } | |
65 | ||
66 | bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) { | |
67 | for (const auto& pred : predecessors_[point]) { | |
68 | if (cleared_points_.count(pred) == 0) { | |
69 | return false; | |
70 | } | |
71 | } | |
72 | return true; | |
73 | } | |
74 | ||
75 | void SyncPoint::Data::ClearCallBack(const std::string& point) { | |
76 | std::unique_lock<std::mutex> lock(mutex_); | |
77 | while (num_callbacks_running_ > 0) { | |
78 | cv_.wait(lock); | |
79 | } | |
80 | callbacks_.erase(point); | |
81 | } | |
82 | ||
83 | void SyncPoint::Data::ClearAllCallBacks() { | |
84 | std::unique_lock<std::mutex> lock(mutex_); | |
85 | while (num_callbacks_running_ > 0) { | |
86 | cv_.wait(lock); | |
87 | } | |
88 | callbacks_.clear(); | |
89 | } | |
90 | ||
91 | void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { | |
92 | if (!enabled_) { | |
93 | return; | |
94 | } | |
95 | ||
96 | std::unique_lock<std::mutex> lock(mutex_); | |
97 | auto thread_id = std::this_thread::get_id(); | |
98 | ||
99 | auto marker_iter = markers_.find(point); | |
100 | if (marker_iter != markers_.end()) { | |
101 | for (auto& marked_point : marker_iter->second) { | |
102 | marked_thread_id_.emplace(marked_point, thread_id); | |
103 | } | |
104 | } | |
105 | ||
106 | if (DisabledByMarker(point, thread_id)) { | |
107 | return; | |
108 | } | |
109 | ||
110 | while (!PredecessorsAllCleared(point)) { | |
111 | cv_.wait(lock); | |
112 | if (DisabledByMarker(point, thread_id)) { | |
113 | return; | |
114 | } | |
115 | } | |
116 | ||
117 | auto callback_pair = callbacks_.find(point); | |
118 | if (callback_pair != callbacks_.end()) { | |
119 | num_callbacks_running_++; | |
120 | mutex_.unlock(); | |
121 | callback_pair->second(cb_arg); | |
122 | mutex_.lock(); | |
123 | num_callbacks_running_--; | |
124 | } | |
125 | cleared_points_.insert(point); | |
126 | cv_.notify_all(); | |
127 | } | |
f67539c2 | 128 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 | 129 | #endif |