]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "OSDMapMapping.h" | |
5 | #include "OSDMap.h" | |
6 | ||
7 | #define dout_subsys ceph_subsys_mon | |
8 | ||
9 | #include "common/debug.h" | |
10 | ||
11 | MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMapMapping, osdmapmapping, | |
12 | osdmap_mapping); | |
13 | ||
14 | // ensure that we have a PoolMappings for each pool and that | |
15 | // the dimensions (pg_num and size) match up. | |
16 | void OSDMapMapping::_init_mappings(const OSDMap& osdmap) | |
17 | { | |
18 | num_pgs = 0; | |
19 | auto q = pools.begin(); | |
20 | for (auto& p : osdmap.get_pools()) { | |
21 | num_pgs += p.second.get_pg_num(); | |
22 | // drop unneeded pools | |
23 | while (q != pools.end() && q->first < p.first) { | |
24 | q = pools.erase(q); | |
25 | } | |
26 | if (q != pools.end() && q->first == p.first) { | |
27 | if (q->second.pg_num != p.second.get_pg_num() || | |
28 | q->second.size != p.second.get_size()) { | |
29 | // pg_num changed | |
30 | q = pools.erase(q); | |
31 | } else { | |
32 | // keep it | |
33 | ++q; | |
34 | continue; | |
35 | } | |
36 | } | |
37 | pools.emplace(p.first, PoolMapping(p.second.get_size(), | |
11fdf7f2 TL |
38 | p.second.get_pg_num(), |
39 | p.second.is_erasure())); | |
7c673cae FG |
40 | } |
41 | pools.erase(q, pools.end()); | |
11fdf7f2 | 42 | ceph_assert(pools.size() == osdmap.get_pools().size()); |
7c673cae FG |
43 | } |
44 | ||
45 | void OSDMapMapping::update(const OSDMap& osdmap) | |
46 | { | |
47 | _start(osdmap); | |
48 | for (auto& p : osdmap.get_pools()) { | |
49 | _update_range(osdmap, p.first, 0, p.second.get_pg_num()); | |
50 | } | |
51 | _finish(osdmap); | |
52 | //_dump(); // for debugging | |
53 | } | |
54 | ||
55 | void OSDMapMapping::update(const OSDMap& osdmap, pg_t pgid) | |
56 | { | |
57 | _update_range(osdmap, pgid.pool(), pgid.ps(), pgid.ps() + 1); | |
58 | } | |
59 | ||
60 | void OSDMapMapping::_build_rmap(const OSDMap& osdmap) | |
61 | { | |
62 | acting_rmap.resize(osdmap.get_max_osd()); | |
63 | //up_rmap.resize(osdmap.get_max_osd()); | |
64 | for (auto& v : acting_rmap) { | |
65 | v.resize(0); | |
66 | } | |
67 | //for (auto& v : up_rmap) { | |
68 | // v.resize(0); | |
69 | //} | |
70 | for (auto& p : pools) { | |
71 | pg_t pgid(0, p.first); | |
72 | for (unsigned ps = 0; ps < p.second.pg_num; ++ps) { | |
73 | pgid.set_ps(ps); | |
74 | int32_t *row = &p.second.table[p.second.row_size() * ps]; | |
75 | for (int i = 0; i < row[2]; ++i) { | |
76 | if (row[4 + i] != CRUSH_ITEM_NONE) { | |
77 | acting_rmap[row[4 + i]].push_back(pgid); | |
78 | } | |
79 | } | |
80 | //for (int i = 0; i < row[3]; ++i) { | |
81 | //up_rmap[row[4 + p.second.size + i]].push_back(pgid); | |
82 | //} | |
83 | } | |
84 | } | |
85 | } | |
86 | ||
87 | void OSDMapMapping::_finish(const OSDMap& osdmap) | |
88 | { | |
89 | _build_rmap(osdmap); | |
90 | epoch = osdmap.get_epoch(); | |
91 | } | |
92 | ||
93 | void OSDMapMapping::_dump() | |
94 | { | |
95 | for (auto& p : pools) { | |
96 | cout << "pool " << p.first << std::endl; | |
97 | for (unsigned i = 0; i < p.second.table.size(); ++i) { | |
98 | cout << " " << p.second.table[i]; | |
99 | if (i % p.second.row_size() == p.second.row_size() - 1) | |
100 | cout << std::endl; | |
101 | } | |
102 | } | |
103 | } | |
104 | ||
105 | void OSDMapMapping::_update_range( | |
106 | const OSDMap& osdmap, | |
107 | int64_t pool, | |
108 | unsigned pg_begin, | |
109 | unsigned pg_end) | |
110 | { | |
111 | auto i = pools.find(pool); | |
11fdf7f2 TL |
112 | ceph_assert(i != pools.end()); |
113 | ceph_assert(pg_begin <= pg_end); | |
114 | ceph_assert(pg_end <= i->second.pg_num); | |
7c673cae FG |
115 | for (unsigned ps = pg_begin; ps < pg_end; ++ps) { |
116 | vector<int> up, acting; | |
117 | int up_primary, acting_primary; | |
118 | osdmap.pg_to_up_acting_osds( | |
119 | pg_t(ps, pool), | |
120 | &up, &up_primary, &acting, &acting_primary); | |
121 | i->second.set(ps, std::move(up), up_primary, | |
122 | std::move(acting), acting_primary); | |
123 | } | |
124 | } | |
125 | ||
126 | // --------------------------- | |
127 | ||
128 | void ParallelPGMapper::Job::finish_one() | |
129 | { | |
130 | Context *fin = nullptr; | |
131 | { | |
11fdf7f2 | 132 | std::lock_guard l(lock); |
7c673cae FG |
133 | if (--shards == 0) { |
134 | if (!aborted) { | |
135 | finish = ceph_clock_now(); | |
136 | complete(); | |
137 | } | |
138 | cond.Signal(); | |
139 | fin = onfinish; | |
140 | onfinish = nullptr; | |
141 | } | |
142 | } | |
143 | if (fin) { | |
144 | fin->complete(0); | |
145 | } | |
146 | } | |
147 | ||
148 | void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h) | |
149 | { | |
494da23a TL |
150 | ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool |
151 | << " [" << i->begin << "," << i->end << ")" | |
152 | << " pgs " << i->pgs | |
153 | << dendl; | |
154 | if (!i->pgs.empty()) | |
155 | i->job->process(i->pgs); | |
156 | else | |
157 | i->job->process(i->pool, i->begin, i->end); | |
7c673cae FG |
158 | i->job->finish_one(); |
159 | delete i; | |
160 | } | |
161 | ||
162 | void ParallelPGMapper::queue( | |
163 | Job *job, | |
494da23a TL |
164 | unsigned pgs_per_item, |
165 | const vector<pg_t>& input_pgs) | |
7c673cae | 166 | { |
224ce89b | 167 | bool any = false; |
494da23a TL |
168 | if (!input_pgs.empty()) { |
169 | unsigned i = 0; | |
170 | vector<pg_t> item_pgs; | |
171 | item_pgs.reserve(pgs_per_item); | |
172 | for (auto& pg : input_pgs) { | |
173 | if (i < pgs_per_item) { | |
174 | ++i; | |
175 | item_pgs.push_back(pg); | |
176 | } | |
177 | if (i >= pgs_per_item) { | |
178 | job->start_one(); | |
179 | wq.queue(new Item(job, item_pgs)); | |
180 | i = 0; | |
181 | item_pgs.clear(); | |
182 | any = true; | |
183 | } | |
184 | } | |
185 | if (!item_pgs.empty()) { | |
186 | job->start_one(); | |
187 | wq.queue(new Item(job, item_pgs)); | |
188 | any = true; | |
189 | } | |
190 | ceph_assert(any); | |
191 | return; | |
192 | } | |
193 | // no input pgs, load all from map | |
7c673cae FG |
194 | for (auto& p : job->osdmap->get_pools()) { |
195 | for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) { | |
11fdf7f2 | 196 | unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num()); |
7c673cae FG |
197 | job->start_one(); |
198 | wq.queue(new Item(job, p.first, ps, ps_end)); | |
199 | ldout(cct, 20) << __func__ << " " << job << " " << p.first << " [" << ps | |
200 | << "," << ps_end << ")" << dendl; | |
224ce89b | 201 | any = true; |
7c673cae FG |
202 | } |
203 | } | |
11fdf7f2 | 204 | ceph_assert(any); |
7c673cae | 205 | } |