]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | # -*- coding: utf-8 -*- |
2 | from __future__ import absolute_import | |
3 | ||
eafe8130 | 4 | import time |
11fdf7f2 TL |
5 | import cherrypy |
6 | ||
7 | from . import ApiController, RESTController, Endpoint, ReadPermission, Task | |
8 | from .. import mgr | |
9 | from ..security import Scope | |
10 | from ..services.ceph_service import CephService | |
11 | from ..services.rbd import RbdConfiguration | |
12 | from ..services.exception import handle_send_command_error | |
eafe8130 | 13 | from ..tools import str_to_bool, TaskManager |
11fdf7f2 TL |
14 | |
15 | ||
16 | def pool_task(name, metadata, wait_for=2.0): | |
17 | return Task("pool/{}".format(name), metadata, wait_for) | |
18 | ||
19 | ||
20 | @ApiController('/pool', Scope.POOL) | |
21 | class Pool(RESTController): | |
22 | ||
23 | @staticmethod | |
24 | def _serialize_pool(pool, attrs): | |
25 | if not attrs or not isinstance(attrs, list): | |
26 | attrs = pool.keys() | |
27 | ||
28 | crush_rules = {r['rule_id']: r["rule_name"] for r in mgr.get('osd_map_crush')['rules']} | |
29 | ||
30 | res = {} | |
31 | for attr in attrs: | |
32 | if attr not in pool: | |
33 | continue | |
34 | if attr == 'type': | |
35 | res[attr] = {1: 'replicated', 3: 'erasure'}[pool[attr]] | |
36 | elif attr == 'crush_rule': | |
37 | res[attr] = crush_rules[pool[attr]] | |
38 | elif attr == 'application_metadata': | |
39 | res[attr] = list(pool[attr].keys()) | |
40 | else: | |
41 | res[attr] = pool[attr] | |
42 | ||
43 | # pool_name is mandatory | |
44 | res['pool_name'] = pool['pool_name'] | |
45 | return res | |
46 | ||
eafe8130 TL |
47 | @classmethod |
48 | def _pool_list(cls, attrs=None, stats=False): | |
11fdf7f2 TL |
49 | if attrs: |
50 | attrs = attrs.split(',') | |
51 | ||
52 | if str_to_bool(stats): | |
53 | pools = CephService.get_pool_list_with_stats() | |
54 | else: | |
55 | pools = CephService.get_pool_list() | |
56 | ||
eafe8130 | 57 | return [cls._serialize_pool(pool, attrs) for pool in pools] |
11fdf7f2 TL |
58 | |
59 | def list(self, attrs=None, stats=False): | |
60 | return self._pool_list(attrs, stats) | |
61 | ||
eafe8130 TL |
62 | @classmethod |
63 | def _get(cls, pool_name, attrs=None, stats=False): | |
11fdf7f2 | 64 | # type: (str, str, bool) -> dict |
eafe8130 | 65 | pools = cls._pool_list(attrs, stats) |
11fdf7f2 TL |
66 | pool = [pool for pool in pools if pool['pool_name'] == pool_name] |
67 | if not pool: | |
68 | raise cherrypy.NotFound('No such pool') | |
69 | return pool[0] | |
70 | ||
71 | def get(self, pool_name, attrs=None, stats=False): | |
72 | # type: (str, str, bool) -> dict | |
73 | pool = self._get(pool_name, attrs, stats) | |
74 | pool['configuration'] = RbdConfiguration(pool_name).list() | |
75 | return pool | |
76 | ||
77 | @pool_task('delete', ['{pool_name}']) | |
78 | @handle_send_command_error('pool') | |
79 | def delete(self, pool_name): | |
80 | return CephService.send_command('mon', 'osd pool delete', pool=pool_name, pool2=pool_name, | |
81 | yes_i_really_really_mean_it=True) | |
82 | ||
83 | @pool_task('edit', ['{pool_name}']) | |
84 | def set(self, pool_name, flags=None, application_metadata=None, configuration=None, **kwargs): | |
85 | self._set_pool_values(pool_name, application_metadata, flags, True, kwargs) | |
86 | RbdConfiguration(pool_name).set_configuration(configuration) | |
eafe8130 | 87 | self._wait_for_pgs(pool_name) |
11fdf7f2 TL |
88 | |
89 | @pool_task('create', {'pool_name': '{pool}'}) | |
90 | @handle_send_command_error('pool') | |
91 | def create(self, pool, pg_num, pool_type, erasure_code_profile=None, flags=None, | |
92 | application_metadata=None, rule_name=None, configuration=None, **kwargs): | |
93 | ecp = erasure_code_profile if erasure_code_profile else None | |
94 | CephService.send_command('mon', 'osd pool create', pool=pool, pg_num=int(pg_num), | |
95 | pgp_num=int(pg_num), pool_type=pool_type, erasure_code_profile=ecp, | |
96 | rule=rule_name) | |
97 | self._set_pool_values(pool, application_metadata, flags, False, kwargs) | |
98 | RbdConfiguration(pool).set_configuration(configuration) | |
eafe8130 | 99 | self._wait_for_pgs(pool) |
11fdf7f2 TL |
100 | |
101 | def _set_pool_values(self, pool, application_metadata, flags, update_existing, kwargs): | |
102 | update_name = False | |
103 | if update_existing: | |
104 | current_pool = self._get(pool) | |
105 | self._handle_update_compression_args(current_pool.get('options'), kwargs) | |
106 | if flags and 'ec_overwrites' in flags: | |
107 | CephService.send_command('mon', 'osd pool set', pool=pool, var='allow_ec_overwrites', | |
108 | val='true') | |
109 | if application_metadata is not None: | |
110 | def set_app(what, app): | |
111 | CephService.send_command('mon', 'osd pool application ' + what, pool=pool, app=app, | |
112 | yes_i_really_mean_it=True) | |
113 | if update_existing: | |
114 | original_app_metadata = set( | |
115 | current_pool.get('application_metadata')) | |
116 | else: | |
117 | original_app_metadata = set() | |
118 | ||
119 | for app in original_app_metadata - set(application_metadata): | |
120 | set_app('disable', app) | |
121 | for app in set(application_metadata) - original_app_metadata: | |
122 | set_app('enable', app) | |
123 | ||
124 | def set_key(key, value): | |
125 | CephService.send_command('mon', 'osd pool set', pool=pool, var=key, val=str(value)) | |
126 | ||
127 | for key, value in kwargs.items(): | |
128 | if key == 'pool': | |
129 | update_name = True | |
130 | destpool = value | |
131 | else: | |
132 | set_key(key, value) | |
133 | if key == 'pg_num': | |
134 | set_key('pgp_num', value) | |
135 | if update_name: | |
136 | CephService.send_command('mon', 'osd pool rename', srcpool=pool, destpool=destpool) | |
137 | ||
138 | def _handle_update_compression_args(self, options, kwargs): | |
139 | if kwargs.get('compression_mode') == 'unset' and options is not None: | |
140 | def reset_arg(arg, value): | |
141 | if options.get(arg): | |
142 | kwargs[arg] = value | |
143 | for arg in ['compression_min_blob_size', 'compression_max_blob_size', | |
144 | 'compression_required_ratio']: | |
145 | reset_arg(arg, '0') | |
146 | reset_arg('compression_algorithm', 'unset') | |
147 | ||
eafe8130 TL |
148 | @classmethod |
149 | def _wait_for_pgs(cls, pool_name): | |
150 | """ | |
151 | Keep the task waiting for until all pg changes are complete | |
152 | :param pool_name: The name of the pool. | |
153 | :type pool_name: string | |
154 | """ | |
155 | current_pool = cls._get(pool_name) | |
156 | initial_pgs = int(current_pool['pg_placement_num']) + int(current_pool['pg_num']) | |
157 | cls._pg_wait_loop(current_pool, initial_pgs) | |
158 | ||
159 | @classmethod | |
160 | def _pg_wait_loop(cls, pool, initial_pgs): | |
161 | """ | |
162 | Compares if all pg changes are completed, if not it will call itself | |
163 | until all changes are completed. | |
164 | :param pool: The dict that represents a pool. | |
165 | :type pool: dict | |
166 | :param initial_pgs: The pg and pg_num count before any change happened. | |
167 | :type initial_pgs: int | |
168 | """ | |
169 | if 'pg_num_target' in pool: | |
170 | target = int(pool['pg_num_target']) + int(pool['pg_placement_num_target']) | |
171 | current = int(pool['pg_placement_num']) + int(pool['pg_num']) | |
172 | if current != target: | |
173 | max_diff = abs(target - initial_pgs) | |
174 | diff = max_diff - abs(target - current) | |
175 | percentage = int(round(diff / float(max_diff) * 100)) | |
176 | TaskManager.current_task().set_progress(percentage) | |
177 | time.sleep(4) | |
178 | cls._pg_wait_loop(cls._get(pool['pool_name']), initial_pgs) | |
179 | ||
11fdf7f2 TL |
180 | @RESTController.Resource() |
181 | @ReadPermission | |
182 | def configuration(self, pool_name): | |
183 | return RbdConfiguration(pool_name).list() | |
184 | ||
185 | @Endpoint() | |
186 | @ReadPermission | |
187 | def _info(self, pool_name=''): | |
188 | # type: (str) -> dict | |
189 | """Used by the create-pool dialog""" | |
190 | ||
191 | def rules(pool_type): | |
192 | return [r | |
193 | for r in mgr.get('osd_map_crush')['rules'] | |
194 | if r['type'] == pool_type] | |
195 | ||
196 | def all_bluestore(): | |
197 | return all(o['osd_objectstore'] == 'bluestore' | |
198 | for o in mgr.get('osd_metadata').values()) | |
199 | ||
200 | def compression_enum(conf_name): | |
201 | return [[v for v in o['enum_values'] if len(v) > 0] | |
202 | for o in mgr.get('config_options')['options'] | |
203 | if o['name'] == conf_name][0] | |
204 | ||
205 | result = { | |
206 | "pool_names": [p['pool_name'] for p in self._pool_list()], | |
207 | "crush_rules_replicated": rules(1), | |
208 | "crush_rules_erasure": rules(3), | |
209 | "is_all_bluestore": all_bluestore(), | |
210 | "osd_count": len(mgr.get('osd_map')['osds']), | |
211 | "bluestore_compression_algorithm": mgr.get('config')['bluestore_compression_algorithm'], | |
212 | "compression_algorithms": compression_enum('bluestore_compression_algorithm'), | |
213 | "compression_modes": compression_enum('bluestore_compression_mode'), | |
214 | } | |
215 | ||
216 | if pool_name: | |
217 | result['pool_options'] = RbdConfiguration(pool_name).list() | |
218 | ||
219 | return result |