]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSDMapMapping.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / osd / OSDMapMapping.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "OSDMapMapping.h"
5 #include "OSDMap.h"
6
7 #define dout_subsys ceph_subsys_mon
8
9 #include "common/debug.h"
10
11 using std::vector;
12
13 MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMapMapping, osdmapmapping,
14 osdmap_mapping);
15
16 // ensure that we have a PoolMappings for each pool and that
17 // the dimensions (pg_num and size) match up.
18 void OSDMapMapping::_init_mappings(const OSDMap& osdmap)
19 {
20 num_pgs = 0;
21 auto q = pools.begin();
22 for (auto& p : osdmap.get_pools()) {
23 num_pgs += p.second.get_pg_num();
24 // drop unneeded pools
25 while (q != pools.end() && q->first < p.first) {
26 q = pools.erase(q);
27 }
28 if (q != pools.end() && q->first == p.first) {
29 if (q->second.pg_num != p.second.get_pg_num() ||
30 q->second.size != p.second.get_size()) {
31 // pg_num changed
32 q = pools.erase(q);
33 } else {
34 // keep it
35 ++q;
36 continue;
37 }
38 }
39 pools.emplace(p.first, PoolMapping(p.second.get_size(),
40 p.second.get_pg_num(),
41 p.second.is_erasure()));
42 }
43 pools.erase(q, pools.end());
44 ceph_assert(pools.size() == osdmap.get_pools().size());
45 }
46
47 void OSDMapMapping::update(const OSDMap& osdmap)
48 {
49 _start(osdmap);
50 for (auto& p : osdmap.get_pools()) {
51 _update_range(osdmap, p.first, 0, p.second.get_pg_num());
52 }
53 _finish(osdmap);
54 //_dump(); // for debugging
55 }
56
57 void OSDMapMapping::update(const OSDMap& osdmap, pg_t pgid)
58 {
59 _update_range(osdmap, pgid.pool(), pgid.ps(), pgid.ps() + 1);
60 }
61
62 void OSDMapMapping::_build_rmap(const OSDMap& osdmap)
63 {
64 acting_rmap.resize(osdmap.get_max_osd());
65 //up_rmap.resize(osdmap.get_max_osd());
66 for (auto& v : acting_rmap) {
67 v.resize(0);
68 }
69 //for (auto& v : up_rmap) {
70 // v.resize(0);
71 //}
72 for (auto& p : pools) {
73 pg_t pgid(0, p.first);
74 for (unsigned ps = 0; ps < p.second.pg_num; ++ps) {
75 pgid.set_ps(ps);
76 int32_t *row = &p.second.table[p.second.row_size() * ps];
77 for (int i = 0; i < row[2]; ++i) {
78 if (row[4 + i] != CRUSH_ITEM_NONE) {
79 acting_rmap[row[4 + i]].push_back(pgid);
80 }
81 }
82 //for (int i = 0; i < row[3]; ++i) {
83 //up_rmap[row[4 + p.second.size + i]].push_back(pgid);
84 //}
85 }
86 }
87 }
88
89 void OSDMapMapping::_finish(const OSDMap& osdmap)
90 {
91 _build_rmap(osdmap);
92 epoch = osdmap.get_epoch();
93 }
94
95 void OSDMapMapping::_dump()
96 {
97 for (auto& p : pools) {
98 std::cout << "pool " << p.first << std::endl;
99 for (unsigned i = 0; i < p.second.table.size(); ++i) {
100 std::cout << " " << p.second.table[i];
101 if (i % p.second.row_size() == p.second.row_size() - 1)
102 std::cout << std::endl;
103 }
104 }
105 }
106
107 void OSDMapMapping::_update_range(
108 const OSDMap& osdmap,
109 int64_t pool,
110 unsigned pg_begin,
111 unsigned pg_end)
112 {
113 auto i = pools.find(pool);
114 ceph_assert(i != pools.end());
115 ceph_assert(pg_begin <= pg_end);
116 ceph_assert(pg_end <= i->second.pg_num);
117 for (unsigned ps = pg_begin; ps < pg_end; ++ps) {
118 std::vector<int> up, acting;
119 int up_primary, acting_primary;
120 osdmap.pg_to_up_acting_osds(
121 pg_t(ps, pool),
122 &up, &up_primary, &acting, &acting_primary);
123 i->second.set(ps, std::move(up), up_primary,
124 std::move(acting), acting_primary);
125 }
126 }
127
128 // ---------------------------
129
130 void ParallelPGMapper::Job::finish_one()
131 {
132 Context *fin = nullptr;
133 {
134 std::lock_guard l(lock);
135 if (--shards == 0) {
136 if (!aborted) {
137 finish = ceph_clock_now();
138 complete();
139 }
140 cond.notify_all();
141 fin = onfinish;
142 onfinish = nullptr;
143 }
144 }
145 if (fin) {
146 fin->complete(0);
147 }
148 }
149
150 void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h)
151 {
152 ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool
153 << " [" << i->begin << "," << i->end << ")"
154 << " pgs " << i->pgs
155 << dendl;
156 if (!i->pgs.empty())
157 i->job->process(i->pgs);
158 else
159 i->job->process(i->pool, i->begin, i->end);
160 i->job->finish_one();
161 delete i;
162 }
163
164 void ParallelPGMapper::queue(
165 Job *job,
166 unsigned pgs_per_item,
167 const vector<pg_t>& input_pgs)
168 {
169 bool any = false;
170 if (!input_pgs.empty()) {
171 unsigned i = 0;
172 vector<pg_t> item_pgs;
173 item_pgs.reserve(pgs_per_item);
174 for (auto& pg : input_pgs) {
175 if (i < pgs_per_item) {
176 ++i;
177 item_pgs.push_back(pg);
178 }
179 if (i >= pgs_per_item) {
180 job->start_one();
181 wq.queue(new Item(job, item_pgs));
182 i = 0;
183 item_pgs.clear();
184 any = true;
185 }
186 }
187 if (!item_pgs.empty()) {
188 job->start_one();
189 wq.queue(new Item(job, item_pgs));
190 any = true;
191 }
192 ceph_assert(any);
193 return;
194 }
195 // no input pgs, load all from map
196 for (auto& p : job->osdmap->get_pools()) {
197 for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
198 unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num());
199 job->start_one();
200 wq.queue(new Item(job, p.first, ps, ps_end));
201 ldout(cct, 20) << __func__ << " " << job << " " << p.first << " [" << ps
202 << "," << ps_end << ")" << dendl;
203 any = true;
204 }
205 }
206 ceph_assert(any);
207 }