]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import json |
2 | import errno | |
494da23a TL |
3 | import logging |
4 | from functools import wraps | |
5 | ||
11fdf7f2 TL |
6 | import six |
7 | import os | |
11fdf7f2 TL |
8 | import tempfile |
9 | import multiprocessing.pool | |
10 | ||
11 | from mgr_module import MgrModule | |
12 | import orchestrator | |
13 | ||
14 | from . import remotes | |
15 | ||
16 | try: | |
17 | import remoto | |
18 | import remoto.process | |
19 | except ImportError as e: | |
20 | remoto = None | |
21 | remoto_import_error = str(e) | |
22 | ||
494da23a | 23 | logger = logging.getLogger(__name__) |
11fdf7f2 TL |
24 | |
25 | # high-level TODO: | |
26 | # - bring over some of the protections from ceph-deploy that guard against | |
27 | # multiple bootstrapping / initialization | |
28 | ||
494da23a | 29 | class SSHCompletionmMixin(object): |
11fdf7f2 TL |
30 | def __init__(self, result): |
31 | if isinstance(result, multiprocessing.pool.AsyncResult): | |
32 | self._result = [result] | |
33 | else: | |
34 | self._result = result | |
35 | assert isinstance(self._result, list) | |
36 | ||
37 | @property | |
38 | def result(self): | |
39 | return list(map(lambda r: r.get(), self._result)) | |
40 | ||
494da23a | 41 | class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion): |
11fdf7f2 TL |
42 | @property |
43 | def is_complete(self): | |
44 | return all(map(lambda r: r.ready(), self._result)) | |
45 | ||
11fdf7f2 | 46 | |
494da23a | 47 | class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion): |
11fdf7f2 TL |
48 | |
49 | @property | |
50 | def is_persistent(self): | |
51 | return all(map(lambda r: r.ready(), self._result)) | |
52 | ||
53 | @property | |
54 | def is_effective(self): | |
55 | return all(map(lambda r: r.ready(), self._result)) | |
56 | ||
57 | @property | |
58 | def is_errored(self): | |
59 | for r in self._result: | |
60 | if not r.ready(): | |
61 | return False | |
62 | if not r.successful(): | |
63 | return True | |
64 | return False | |
65 | ||
66 | class SSHWriteCompletionReady(SSHWriteCompletion): | |
67 | def __init__(self, result): | |
81eedcae | 68 | orchestrator.WriteCompletion.__init__(self) |
11fdf7f2 TL |
69 | self._result = result |
70 | ||
71 | @property | |
72 | def result(self): | |
73 | return self._result | |
74 | ||
75 | @property | |
76 | def is_persistent(self): | |
77 | return True | |
78 | ||
79 | @property | |
80 | def is_effective(self): | |
81 | return True | |
82 | ||
83 | @property | |
84 | def is_errored(self): | |
85 | return False | |
86 | ||
87 | class SSHConnection(object): | |
88 | """ | |
89 | Tie tempfile lifetime (e.g. ssh_config) to a remoto connection. | |
90 | """ | |
91 | def __init__(self): | |
92 | self.conn = None | |
93 | self.temp_file = None | |
94 | ||
95 | # proxy to the remoto connection | |
96 | def __getattr__(self, name): | |
97 | return getattr(self.conn, name) | |
98 | ||
494da23a TL |
99 | |
100 | def log_exceptions(f): | |
101 | if six.PY3: | |
102 | return f | |
103 | else: | |
104 | # Python 2 does no exception chaining, thus the | |
105 | # real exception is lost | |
106 | @wraps(f) | |
107 | def wrapper(*args, **kwargs): | |
108 | try: | |
109 | return f(*args, **kwargs) | |
110 | except Exception: | |
111 | logger.exception('something went wrong.') | |
112 | raise | |
113 | return wrapper | |
114 | ||
115 | ||
11fdf7f2 TL |
116 | class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): |
117 | ||
118 | _STORE_HOST_PREFIX = "host" | |
119 | _DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN = 10 | |
120 | ||
121 | MODULE_OPTIONS = [ | |
122 | {'name': 'ssh_config_file'}, | |
123 | {'name': 'inventory_cache_timeout_min'}, | |
124 | ] | |
125 | ||
126 | COMMANDS = [ | |
127 | { | |
128 | 'cmd': 'ssh set-ssh-config', | |
129 | 'desc': 'Set the ssh_config file (use -i <ssh_config>)', | |
130 | 'perm': 'rw' | |
131 | }, | |
132 | { | |
133 | 'cmd': 'ssh clear-ssh-config', | |
134 | 'desc': 'Clear the ssh_config file', | |
135 | 'perm': 'rw' | |
136 | }, | |
137 | ] | |
138 | ||
139 | def __init__(self, *args, **kwargs): | |
140 | super(SSHOrchestrator, self).__init__(*args, **kwargs) | |
141 | self._cluster_fsid = None | |
142 | self._worker_pool = multiprocessing.pool.ThreadPool(1) | |
143 | ||
494da23a TL |
144 | # the keys in inventory_cache are authoritative. |
145 | # You must not call remove_outdated() | |
146 | # The values are cached by instance. | |
147 | # cache is invalidated by | |
148 | # 1. timeout | |
149 | # 2. refresh parameter | |
150 | self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX) | |
151 | ||
11fdf7f2 TL |
152 | def handle_command(self, inbuf, command): |
153 | if command["prefix"] == "ssh set-ssh-config": | |
154 | return self._set_ssh_config(inbuf, command) | |
155 | elif command["prefix"] == "ssh clear-ssh-config": | |
156 | return self._clear_ssh_config(inbuf, command) | |
157 | else: | |
158 | raise NotImplementedError(command["prefix"]) | |
159 | ||
160 | @staticmethod | |
161 | def can_run(): | |
162 | if remoto is not None: | |
163 | return True, "" | |
164 | else: | |
165 | return False, "loading remoto library:{}".format( | |
166 | remoto_import_error) | |
167 | ||
168 | def available(self): | |
169 | """ | |
170 | The SSH orchestrator is always available. | |
171 | """ | |
172 | return self.can_run() | |
173 | ||
174 | def wait(self, completions): | |
175 | self.log.info("wait: completions={}".format(completions)) | |
176 | ||
177 | complete = True | |
178 | for c in completions: | |
494da23a TL |
179 | if c.is_complete: |
180 | continue | |
181 | ||
11fdf7f2 TL |
182 | if not isinstance(c, SSHReadCompletion) and \ |
183 | not isinstance(c, SSHWriteCompletion): | |
184 | raise TypeError("unexpected completion: {}".format(c.__class__)) | |
185 | ||
11fdf7f2 TL |
186 | complete = False |
187 | ||
188 | return complete | |
189 | ||
11fdf7f2 TL |
190 | def _get_cluster_fsid(self): |
191 | """ | |
192 | Fetch and cache the cluster fsid. | |
193 | """ | |
194 | if not self._cluster_fsid: | |
195 | self._cluster_fsid = self.get("mon_map")["fsid"] | |
196 | assert isinstance(self._cluster_fsid, six.string_types) | |
197 | return self._cluster_fsid | |
198 | ||
199 | def _require_hosts(self, hosts): | |
200 | """ | |
201 | Raise an error if any of the given hosts are unregistered. | |
202 | """ | |
203 | if isinstance(hosts, six.string_types): | |
204 | hosts = [hosts] | |
494da23a TL |
205 | keys = self.inventory_cache.keys() |
206 | unregistered_hosts = set(hosts) - keys | |
11fdf7f2 | 207 | if unregistered_hosts: |
494da23a | 208 | logger.warning('keys = {}'.format(keys)) |
11fdf7f2 TL |
209 | raise RuntimeError("Host(s) {} not registered".format( |
210 | ", ".join(map(lambda h: "'{}'".format(h), | |
211 | unregistered_hosts)))) | |
212 | ||
213 | def _set_ssh_config(self, inbuf, command): | |
214 | """ | |
215 | Set an ssh_config file provided from stdin | |
216 | ||
217 | TODO: | |
218 | - validation | |
219 | """ | |
220 | if len(inbuf) == 0: | |
221 | return errno.EINVAL, "", "empty ssh config provided" | |
222 | self.set_store("ssh_config", inbuf) | |
223 | return 0, "", "" | |
224 | ||
225 | def _clear_ssh_config(self, inbuf, command): | |
226 | """ | |
227 | Clear the ssh_config file provided from stdin | |
228 | """ | |
229 | self.set_store("ssh_config", None) | |
230 | self.ssh_config_tmp = None | |
231 | return 0, "", "" | |
232 | ||
233 | def _get_connection(self, host): | |
234 | """ | |
235 | Setup a connection for running commands on remote host. | |
236 | """ | |
237 | ssh_options = None | |
238 | ||
239 | conn = SSHConnection() | |
240 | ||
241 | ssh_config = self.get_store("ssh_config") | |
242 | if ssh_config is not None: | |
243 | conn.temp_file = tempfile.NamedTemporaryFile() | |
244 | conn.temp_file.write(ssh_config.encode('utf-8')) | |
245 | conn.temp_file.flush() # make visible to other processes | |
246 | ssh_config_fname = conn.temp_file.name | |
247 | else: | |
248 | ssh_config_fname = self.get_localized_module_option("ssh_config_file") | |
249 | ||
250 | if ssh_config_fname: | |
251 | if not os.path.isfile(ssh_config_fname): | |
252 | raise Exception("ssh_config \"{}\" does not exist".format(ssh_config_fname)) | |
253 | ssh_options = "-F {}".format(ssh_config_fname) | |
254 | ||
255 | self.log.info("opening connection to host '{}' with ssh " | |
256 | "options '{}'".format(host, ssh_options)) | |
257 | ||
258 | conn.conn = remoto.Connection(host, | |
259 | logger=self.log, | |
260 | detect_sudo=True, | |
261 | ssh_options=ssh_options) | |
262 | ||
263 | conn.conn.import_module(remotes) | |
264 | ||
265 | return conn | |
266 | ||
267 | def _executable_path(self, conn, executable): | |
268 | """ | |
269 | Remote validator that accepts a connection object to ensure that a certain | |
270 | executable is available returning its full path if so. | |
271 | ||
272 | Otherwise an exception with thorough details will be raised, informing the | |
273 | user that the executable was not found. | |
274 | """ | |
275 | executable_path = conn.remote_module.which(executable) | |
276 | if not executable_path: | |
277 | raise RuntimeError("Executable '{}' not found on host '{}'".format( | |
278 | executable, conn.hostname)) | |
279 | self.log.info("Found executable '{}' at path '{}'".format(executable, | |
280 | executable_path)) | |
281 | return executable_path | |
282 | ||
283 | def _build_ceph_conf(self): | |
284 | """ | |
285 | Build a minimal `ceph.conf` containing the current monitor hosts. | |
286 | ||
287 | Notes: | |
288 | - ceph-volume complains if no section header (e.g. global) exists | |
289 | - other ceph cli tools complained about no EOF newline | |
290 | ||
291 | TODO: | |
292 | - messenger v2 syntax? | |
293 | """ | |
294 | mon_map = self.get("mon_map") | |
295 | mon_addrs = map(lambda m: m["addr"], mon_map["mons"]) | |
296 | mon_hosts = ", ".join(mon_addrs) | |
297 | return "[global]\nmon host = {}\n".format(mon_hosts) | |
298 | ||
299 | def _ensure_ceph_conf(self, conn, network=False): | |
300 | """ | |
301 | Install ceph.conf on remote node if it doesn't exist. | |
302 | """ | |
303 | conf = self._build_ceph_conf() | |
304 | if network: | |
305 | conf += "public_network = {}\n".format(network) | |
306 | conn.remote_module.write_conf("/etc/ceph/ceph.conf", conf) | |
307 | ||
308 | def _get_bootstrap_key(self, service_type): | |
309 | """ | |
310 | Fetch a bootstrap key for a service type. | |
311 | ||
312 | :param service_type: name (e.g. mds, osd, mon, ...) | |
313 | """ | |
314 | identity_dict = { | |
315 | 'admin' : 'client.admin', | |
316 | 'mds' : 'client.bootstrap-mds', | |
317 | 'mgr' : 'client.bootstrap-mgr', | |
318 | 'osd' : 'client.bootstrap-osd', | |
319 | 'rgw' : 'client.bootstrap-rgw', | |
320 | 'mon' : 'mon.' | |
321 | } | |
322 | ||
323 | identity = identity_dict[service_type] | |
324 | ||
325 | ret, out, err = self.mon_command({ | |
326 | "prefix": "auth get", | |
327 | "entity": identity | |
328 | }) | |
329 | ||
330 | if ret == -errno.ENOENT: | |
331 | raise RuntimeError("Entity '{}' not found: '{}'".format(identity, err)) | |
332 | elif ret != 0: | |
333 | raise RuntimeError("Error retrieving key for '{}' ret {}: '{}'".format( | |
334 | identity, ret, err)) | |
335 | ||
336 | return out | |
337 | ||
338 | def _bootstrap_mgr(self, conn): | |
339 | """ | |
340 | Bootstrap a manager. | |
341 | ||
342 | 1. install a copy of ceph.conf | |
343 | 2. install the manager bootstrap key | |
344 | ||
345 | :param conn: remote host connection | |
346 | """ | |
347 | self._ensure_ceph_conf(conn) | |
348 | keyring = self._get_bootstrap_key("mgr") | |
349 | keyring_path = "/var/lib/ceph/bootstrap-mgr/ceph.keyring" | |
350 | conn.remote_module.write_keyring(keyring_path, keyring) | |
351 | return keyring_path | |
352 | ||
353 | def _bootstrap_osd(self, conn): | |
354 | """ | |
355 | Bootstrap an osd. | |
356 | ||
357 | 1. install a copy of ceph.conf | |
358 | 2. install the osd bootstrap key | |
359 | ||
360 | :param conn: remote host connection | |
361 | """ | |
362 | self._ensure_ceph_conf(conn) | |
363 | keyring = self._get_bootstrap_key("osd") | |
364 | keyring_path = "/var/lib/ceph/bootstrap-osd/ceph.keyring" | |
365 | conn.remote_module.write_keyring(keyring_path, keyring) | |
366 | return keyring_path | |
367 | ||
11fdf7f2 | 368 | def _get_hosts(self, wanted=None): |
494da23a | 369 | return self.inventory_cache.items_filtered(wanted) |
11fdf7f2 TL |
370 | |
371 | def add_host(self, host): | |
372 | """ | |
373 | Add a host to be managed by the orchestrator. | |
374 | ||
375 | :param host: host name | |
376 | """ | |
494da23a | 377 | @log_exceptions |
11fdf7f2 | 378 | def run(host): |
494da23a | 379 | self.inventory_cache[host] = orchestrator.OutdatableData() |
11fdf7f2 TL |
380 | return "Added host '{}'".format(host) |
381 | ||
382 | return SSHWriteCompletion( | |
383 | self._worker_pool.apply_async(run, (host,))) | |
384 | ||
385 | def remove_host(self, host): | |
386 | """ | |
387 | Remove a host from orchestrator management. | |
388 | ||
389 | :param host: host name | |
390 | """ | |
494da23a | 391 | @log_exceptions |
11fdf7f2 | 392 | def run(host): |
494da23a | 393 | del self.inventory_cache[host] |
11fdf7f2 TL |
394 | return "Removed host '{}'".format(host) |
395 | ||
396 | return SSHWriteCompletion( | |
397 | self._worker_pool.apply_async(run, (host,))) | |
398 | ||
399 | def get_hosts(self): | |
400 | """ | |
401 | Return a list of hosts managed by the orchestrator. | |
402 | ||
403 | Notes: | |
404 | - skip async: manager reads from cache. | |
405 | ||
406 | TODO: | |
407 | - InventoryNode probably needs to be able to report labels | |
408 | """ | |
494da23a TL |
409 | nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache] |
410 | return orchestrator.TrivialReadCompletion(nodes) | |
11fdf7f2 TL |
411 | |
412 | def _get_device_inventory(self, host): | |
413 | """ | |
414 | Query storage devices on a remote node. | |
415 | ||
416 | :return: list of InventoryDevice | |
417 | """ | |
418 | conn = self._get_connection(host) | |
419 | ||
420 | try: | |
421 | ceph_volume_executable = self._executable_path(conn, 'ceph-volume') | |
422 | command = [ | |
423 | ceph_volume_executable, | |
424 | "inventory", | |
425 | "--format=json" | |
426 | ] | |
427 | ||
428 | out, err, code = remoto.process.check(conn, command) | |
429 | host_devices = json.loads(out[0]) | |
430 | return host_devices | |
431 | ||
432 | except Exception as ex: | |
433 | self.log.exception(ex) | |
434 | raise | |
435 | ||
436 | finally: | |
437 | conn.exit() | |
438 | ||
439 | def get_inventory(self, node_filter=None, refresh=False): | |
440 | """ | |
441 | Return the storage inventory of nodes matching the given filter. | |
442 | ||
443 | :param node_filter: node filter | |
444 | ||
445 | TODO: | |
446 | - add filtering by label | |
447 | """ | |
448 | if node_filter: | |
449 | hosts = node_filter.nodes | |
450 | self._require_hosts(hosts) | |
451 | hosts = self._get_hosts(hosts) | |
452 | else: | |
453 | # this implies the returned hosts are registered | |
454 | hosts = self._get_hosts() | |
455 | ||
494da23a TL |
456 | @log_exceptions |
457 | def run(host, host_info): | |
458 | # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode | |
11fdf7f2 | 459 | |
494da23a TL |
460 | timeout_min = int(self.get_module_option( |
461 | "inventory_cache_timeout_min", | |
462 | self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN)) | |
11fdf7f2 | 463 | |
494da23a TL |
464 | if host_info.outdated(timeout_min) or refresh: |
465 | self.log.info("refresh stale inventory for '{}'".format(host)) | |
466 | data = self._get_device_inventory(host) | |
467 | host_info = orchestrator.OutdatableData(data) | |
468 | self.inventory_cache[host] = host_info | |
469 | else: | |
470 | self.log.debug("reading cached inventory for '{}'".format(host)) | |
11fdf7f2 | 471 | |
494da23a | 472 | devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data) |
11fdf7f2 TL |
473 | return orchestrator.InventoryNode(host, devices) |
474 | ||
475 | results = [] | |
476 | for key, host_info in hosts: | |
477 | result = self._worker_pool.apply_async(run, (key, host_info)) | |
478 | results.append(result) | |
479 | ||
480 | return SSHReadCompletion(results) | |
481 | ||
494da23a | 482 | @log_exceptions |
11fdf7f2 TL |
483 | def _create_osd(self, host, drive_group): |
484 | conn = self._get_connection(host) | |
485 | try: | |
486 | devices = drive_group.data_devices.paths | |
487 | self._bootstrap_osd(conn) | |
488 | ||
489 | for device in devices: | |
490 | ceph_volume_executable = self._executable_path(conn, "ceph-volume") | |
491 | command = [ | |
492 | ceph_volume_executable, | |
493 | "lvm", | |
494 | "create", | |
495 | "--cluster-fsid", self._get_cluster_fsid(), | |
496 | "--{}".format(drive_group.objectstore), | |
497 | "--data", device | |
498 | ] | |
499 | remoto.process.run(conn, command) | |
500 | ||
501 | return "Created osd on host '{}'".format(host) | |
502 | ||
503 | except: | |
504 | raise | |
505 | ||
506 | finally: | |
507 | conn.exit() | |
508 | ||
509 | def create_osds(self, drive_group, all_hosts=None): | |
510 | """ | |
511 | Create a new osd. | |
512 | ||
513 | The orchestrator CLI currently handles a narrow form of drive | |
514 | specification defined by a single block device using bluestore. | |
515 | ||
516 | :param drive_group: osd specification | |
517 | ||
518 | TODO: | |
519 | - support full drive_group specification | |
520 | - support batch creation | |
521 | """ | |
522 | assert len(drive_group.hosts(all_hosts)) == 1 | |
523 | assert len(drive_group.data_devices.paths) > 0 | |
524 | assert all(map(lambda p: isinstance(p, six.string_types), | |
525 | drive_group.data_devices.paths)) | |
526 | ||
527 | host = drive_group.hosts(all_hosts)[0] | |
528 | self._require_hosts(host) | |
529 | ||
530 | result = self._worker_pool.apply_async(self._create_osd, (host, | |
531 | drive_group)) | |
532 | ||
533 | return SSHWriteCompletion(result) | |
534 | ||
535 | def _create_mon(self, host, network): | |
536 | """ | |
537 | Create a new monitor on the given host. | |
538 | """ | |
539 | self.log.info("create_mon({}:{}): starting".format(host, network)) | |
540 | ||
541 | conn = self._get_connection(host) | |
542 | ||
543 | try: | |
544 | self._ensure_ceph_conf(conn, network) | |
545 | ||
546 | uid = conn.remote_module.path_getuid("/var/lib/ceph") | |
547 | gid = conn.remote_module.path_getgid("/var/lib/ceph") | |
548 | ||
549 | # install client admin key on target mon host | |
550 | admin_keyring = self._get_bootstrap_key("admin") | |
551 | admin_keyring_path = '/etc/ceph/ceph.client.admin.keyring' | |
552 | conn.remote_module.write_keyring(admin_keyring_path, admin_keyring, uid, gid) | |
553 | ||
554 | mon_path = "/var/lib/ceph/mon/ceph-{name}".format(name=host) | |
555 | conn.remote_module.create_mon_path(mon_path, uid, gid) | |
556 | ||
557 | # bootstrap key | |
558 | conn.remote_module.safe_makedirs("/var/lib/ceph/tmp") | |
559 | monitor_keyring = self._get_bootstrap_key("mon") | |
560 | mon_keyring_path = "/var/lib/ceph/tmp/ceph-{name}.mon.keyring".format(name=host) | |
561 | conn.remote_module.write_file( | |
562 | mon_keyring_path, | |
563 | monitor_keyring, | |
564 | 0o600, | |
565 | None, | |
566 | uid, | |
567 | gid | |
568 | ) | |
569 | ||
570 | # monitor map | |
571 | monmap_path = "/var/lib/ceph/tmp/ceph.{name}.monmap".format(name=host) | |
572 | remoto.process.run(conn, | |
573 | ['ceph', 'mon', 'getmap', '-o', monmap_path], | |
574 | ) | |
575 | ||
576 | user_args = [] | |
577 | if uid != 0: | |
578 | user_args = user_args + [ '--setuser', str(uid) ] | |
579 | if gid != 0: | |
580 | user_args = user_args + [ '--setgroup', str(gid) ] | |
581 | ||
582 | remoto.process.run(conn, | |
583 | ['ceph-mon', '--mkfs', '-i', host, | |
584 | '--monmap', monmap_path, '--keyring', mon_keyring_path | |
585 | ] + user_args | |
586 | ) | |
587 | ||
588 | remoto.process.run(conn, | |
589 | ['systemctl', 'enable', 'ceph.target'], | |
590 | timeout=7, | |
591 | ) | |
592 | ||
593 | remoto.process.run(conn, | |
594 | ['systemctl', 'enable', 'ceph-mon@{name}'.format(name=host)], | |
595 | timeout=7, | |
596 | ) | |
597 | ||
598 | remoto.process.run(conn, | |
599 | ['systemctl', 'start', 'ceph-mon@{name}'.format(name=host)], | |
600 | timeout=7, | |
601 | ) | |
602 | ||
603 | return "Created mon on host '{}'".format(host) | |
604 | ||
605 | except Exception as e: | |
606 | self.log.error("create_mon({}:{}): error: {}".format(host, network, e)) | |
607 | raise | |
608 | ||
609 | finally: | |
610 | self.log.info("create_mon({}:{}): finished".format(host, network)) | |
611 | conn.exit() | |
612 | ||
613 | def update_mons(self, num, hosts): | |
614 | """ | |
615 | Adjust the number of cluster monitors. | |
616 | """ | |
617 | # current support limited to adding monitors. | |
618 | mon_map = self.get("mon_map") | |
619 | num_mons = len(mon_map["mons"]) | |
620 | if num == num_mons: | |
621 | return SSHWriteCompletionReady("The requested number of monitors exist.") | |
622 | if num < num_mons: | |
623 | raise NotImplementedError("Removing monitors is not supported.") | |
624 | ||
625 | # check that all the hostnames are registered | |
626 | self._require_hosts(map(lambda h: h[0], hosts)) | |
627 | ||
628 | # current support requires a network to be specified | |
629 | for host, network in hosts: | |
630 | if not network: | |
631 | raise RuntimeError("Host '{}' missing network " | |
632 | "part".format(host)) | |
633 | ||
634 | # explicit placement: enough hosts provided? | |
635 | num_new_mons = num - num_mons | |
636 | if len(hosts) < num_new_mons: | |
637 | raise RuntimeError("Error: {} hosts provided, expected {}".format( | |
638 | len(hosts), num_new_mons)) | |
639 | ||
640 | self.log.info("creating {} monitors on hosts: '{}'".format( | |
641 | num_new_mons, ",".join(map(lambda h: ":".join(h), hosts)))) | |
642 | ||
643 | # TODO: we may want to chain the creation of the monitors so they join | |
644 | # the quroum one at a time. | |
645 | results = [] | |
646 | for host, network in hosts: | |
647 | result = self._worker_pool.apply_async(self._create_mon, (host, | |
648 | network)) | |
649 | results.append(result) | |
650 | ||
651 | return SSHWriteCompletion(results) | |
652 | ||
653 | def _create_mgr(self, host): | |
654 | """ | |
655 | Create a new manager instance on a host. | |
656 | """ | |
657 | self.log.info("create_mgr({}): starting".format(host)) | |
658 | ||
659 | conn = self._get_connection(host) | |
660 | ||
661 | try: | |
662 | bootstrap_keyring_path = self._bootstrap_mgr(conn) | |
663 | ||
664 | mgr_path = "/var/lib/ceph/mgr/ceph-{name}".format(name=host) | |
665 | conn.remote_module.safe_makedirs(mgr_path) | |
666 | keyring_path = os.path.join(mgr_path, "keyring") | |
667 | ||
668 | command = [ | |
669 | 'ceph', | |
670 | '--name', 'client.bootstrap-mgr', | |
671 | '--keyring', bootstrap_keyring_path, | |
672 | 'auth', 'get-or-create', 'mgr.{name}'.format(name=host), | |
673 | 'mon', 'allow profile mgr', | |
674 | 'osd', 'allow *', | |
675 | 'mds', 'allow *', | |
676 | '-o', | |
677 | keyring_path | |
678 | ] | |
679 | ||
680 | out, err, ret = remoto.process.check(conn, command) | |
681 | if ret != 0: | |
682 | raise Exception("oops") | |
683 | ||
684 | remoto.process.run(conn, | |
685 | ['systemctl', 'enable', 'ceph-mgr@{name}'.format(name=host)], | |
686 | timeout=7 | |
687 | ) | |
688 | ||
689 | remoto.process.run(conn, | |
690 | ['systemctl', 'start', 'ceph-mgr@{name}'.format(name=host)], | |
691 | timeout=7 | |
692 | ) | |
693 | ||
694 | remoto.process.run(conn, | |
695 | ['systemctl', 'enable', 'ceph.target'], | |
696 | timeout=7 | |
697 | ) | |
698 | ||
699 | return "Created mgr on host '{}'".format(host) | |
700 | ||
701 | except Exception as e: | |
702 | self.log.error("create_mgr({}): error: {}".format(host, e)) | |
703 | raise | |
704 | ||
705 | finally: | |
706 | self.log.info("create_mgr({}): finished".format(host)) | |
707 | conn.exit() | |
708 | ||
709 | def update_mgrs(self, num, hosts): | |
710 | """ | |
711 | Adjust the number of cluster managers. | |
712 | """ | |
713 | # current support limited to adding managers. | |
714 | mgr_map = self.get("mgr_map") | |
715 | num_mgrs = 1 if mgr_map["active_name"] else 0 | |
716 | num_mgrs += len(mgr_map["standbys"]) | |
717 | if num == num_mgrs: | |
718 | return SSHWriteCompletionReady("The requested number of managers exist.") | |
719 | if num < num_mgrs: | |
720 | raise NotImplementedError("Removing managers is not supported") | |
721 | ||
722 | # check that all the hosts are registered | |
11fdf7f2 TL |
723 | self._require_hosts(hosts) |
724 | ||
725 | # we assume explicit placement by which there are the same number of | |
726 | # hosts specified as the size of increase in number of daemons. | |
727 | num_new_mgrs = num - num_mgrs | |
728 | if len(hosts) < num_new_mgrs: | |
729 | raise RuntimeError("Error: {} hosts provided, expected {}".format( | |
730 | len(hosts), num_new_mgrs)) | |
731 | ||
732 | self.log.info("creating {} managers on hosts: '{}'".format( | |
733 | num_new_mgrs, ",".join(hosts))) | |
734 | ||
735 | results = [] | |
736 | for i in range(num_new_mgrs): | |
737 | result = self._worker_pool.apply_async(self._create_mgr, (hosts[i],)) | |
738 | results.append(result) | |
739 | ||
740 | return SSHWriteCompletion(results) |