]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/ssh/module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / ssh / module.py
CommitLineData
11fdf7f2
TL
1import json
2import errno
494da23a
TL
3import logging
4from functools import wraps
5
11fdf7f2
TL
6import six
7import os
11fdf7f2
TL
8import tempfile
9import multiprocessing.pool
10
11from mgr_module import MgrModule
12import orchestrator
13
14from . import remotes
15
16try:
17 import remoto
18 import remoto.process
19except ImportError as e:
20 remoto = None
21 remoto_import_error = str(e)
22
494da23a 23logger = logging.getLogger(__name__)
11fdf7f2
TL
24
25# high-level TODO:
26# - bring over some of the protections from ceph-deploy that guard against
27# multiple bootstrapping / initialization
28
494da23a 29class SSHCompletionmMixin(object):
11fdf7f2
TL
30 def __init__(self, result):
31 if isinstance(result, multiprocessing.pool.AsyncResult):
32 self._result = [result]
33 else:
34 self._result = result
35 assert isinstance(self._result, list)
36
37 @property
38 def result(self):
39 return list(map(lambda r: r.get(), self._result))
40
494da23a 41class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
11fdf7f2
TL
42 @property
43 def is_complete(self):
44 return all(map(lambda r: r.ready(), self._result))
45
11fdf7f2 46
494da23a 47class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
11fdf7f2
TL
48
49 @property
50 def is_persistent(self):
51 return all(map(lambda r: r.ready(), self._result))
52
53 @property
54 def is_effective(self):
55 return all(map(lambda r: r.ready(), self._result))
56
57 @property
58 def is_errored(self):
59 for r in self._result:
60 if not r.ready():
61 return False
62 if not r.successful():
63 return True
64 return False
65
66class SSHWriteCompletionReady(SSHWriteCompletion):
67 def __init__(self, result):
81eedcae 68 orchestrator.WriteCompletion.__init__(self)
11fdf7f2
TL
69 self._result = result
70
71 @property
72 def result(self):
73 return self._result
74
75 @property
76 def is_persistent(self):
77 return True
78
79 @property
80 def is_effective(self):
81 return True
82
83 @property
84 def is_errored(self):
85 return False
86
87class SSHConnection(object):
88 """
89 Tie tempfile lifetime (e.g. ssh_config) to a remoto connection.
90 """
91 def __init__(self):
92 self.conn = None
93 self.temp_file = None
94
95 # proxy to the remoto connection
96 def __getattr__(self, name):
97 return getattr(self.conn, name)
98
494da23a
TL
99
100def log_exceptions(f):
101 if six.PY3:
102 return f
103 else:
104 # Python 2 does no exception chaining, thus the
105 # real exception is lost
106 @wraps(f)
107 def wrapper(*args, **kwargs):
108 try:
109 return f(*args, **kwargs)
110 except Exception:
111 logger.exception('something went wrong.')
112 raise
113 return wrapper
114
115
11fdf7f2
TL
116class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
117
118 _STORE_HOST_PREFIX = "host"
119 _DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN = 10
120
121 MODULE_OPTIONS = [
122 {'name': 'ssh_config_file'},
123 {'name': 'inventory_cache_timeout_min'},
124 ]
125
126 COMMANDS = [
127 {
128 'cmd': 'ssh set-ssh-config',
129 'desc': 'Set the ssh_config file (use -i <ssh_config>)',
130 'perm': 'rw'
131 },
132 {
133 'cmd': 'ssh clear-ssh-config',
134 'desc': 'Clear the ssh_config file',
135 'perm': 'rw'
136 },
137 ]
138
139 def __init__(self, *args, **kwargs):
140 super(SSHOrchestrator, self).__init__(*args, **kwargs)
141 self._cluster_fsid = None
142 self._worker_pool = multiprocessing.pool.ThreadPool(1)
143
494da23a
TL
144 # the keys in inventory_cache are authoritative.
145 # You must not call remove_outdated()
146 # The values are cached by instance.
147 # cache is invalidated by
148 # 1. timeout
149 # 2. refresh parameter
150 self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX)
151
11fdf7f2
TL
152 def handle_command(self, inbuf, command):
153 if command["prefix"] == "ssh set-ssh-config":
154 return self._set_ssh_config(inbuf, command)
155 elif command["prefix"] == "ssh clear-ssh-config":
156 return self._clear_ssh_config(inbuf, command)
157 else:
158 raise NotImplementedError(command["prefix"])
159
160 @staticmethod
161 def can_run():
162 if remoto is not None:
163 return True, ""
164 else:
165 return False, "loading remoto library:{}".format(
166 remoto_import_error)
167
168 def available(self):
169 """
170 The SSH orchestrator is always available.
171 """
172 return self.can_run()
173
174 def wait(self, completions):
175 self.log.info("wait: completions={}".format(completions))
176
177 complete = True
178 for c in completions:
494da23a
TL
179 if c.is_complete:
180 continue
181
11fdf7f2
TL
182 if not isinstance(c, SSHReadCompletion) and \
183 not isinstance(c, SSHWriteCompletion):
184 raise TypeError("unexpected completion: {}".format(c.__class__))
185
11fdf7f2
TL
186 complete = False
187
188 return complete
189
11fdf7f2
TL
190 def _get_cluster_fsid(self):
191 """
192 Fetch and cache the cluster fsid.
193 """
194 if not self._cluster_fsid:
195 self._cluster_fsid = self.get("mon_map")["fsid"]
196 assert isinstance(self._cluster_fsid, six.string_types)
197 return self._cluster_fsid
198
199 def _require_hosts(self, hosts):
200 """
201 Raise an error if any of the given hosts are unregistered.
202 """
203 if isinstance(hosts, six.string_types):
204 hosts = [hosts]
494da23a
TL
205 keys = self.inventory_cache.keys()
206 unregistered_hosts = set(hosts) - keys
11fdf7f2 207 if unregistered_hosts:
494da23a 208 logger.warning('keys = {}'.format(keys))
11fdf7f2
TL
209 raise RuntimeError("Host(s) {} not registered".format(
210 ", ".join(map(lambda h: "'{}'".format(h),
211 unregistered_hosts))))
212
213 def _set_ssh_config(self, inbuf, command):
214 """
215 Set an ssh_config file provided from stdin
216
217 TODO:
218 - validation
219 """
220 if len(inbuf) == 0:
221 return errno.EINVAL, "", "empty ssh config provided"
222 self.set_store("ssh_config", inbuf)
223 return 0, "", ""
224
225 def _clear_ssh_config(self, inbuf, command):
226 """
227 Clear the ssh_config file provided from stdin
228 """
229 self.set_store("ssh_config", None)
230 self.ssh_config_tmp = None
231 return 0, "", ""
232
233 def _get_connection(self, host):
234 """
235 Setup a connection for running commands on remote host.
236 """
237 ssh_options = None
238
239 conn = SSHConnection()
240
241 ssh_config = self.get_store("ssh_config")
242 if ssh_config is not None:
243 conn.temp_file = tempfile.NamedTemporaryFile()
244 conn.temp_file.write(ssh_config.encode('utf-8'))
245 conn.temp_file.flush() # make visible to other processes
246 ssh_config_fname = conn.temp_file.name
247 else:
248 ssh_config_fname = self.get_localized_module_option("ssh_config_file")
249
250 if ssh_config_fname:
251 if not os.path.isfile(ssh_config_fname):
252 raise Exception("ssh_config \"{}\" does not exist".format(ssh_config_fname))
253 ssh_options = "-F {}".format(ssh_config_fname)
254
255 self.log.info("opening connection to host '{}' with ssh "
256 "options '{}'".format(host, ssh_options))
257
258 conn.conn = remoto.Connection(host,
259 logger=self.log,
260 detect_sudo=True,
261 ssh_options=ssh_options)
262
263 conn.conn.import_module(remotes)
264
265 return conn
266
267 def _executable_path(self, conn, executable):
268 """
269 Remote validator that accepts a connection object to ensure that a certain
270 executable is available returning its full path if so.
271
272 Otherwise an exception with thorough details will be raised, informing the
273 user that the executable was not found.
274 """
275 executable_path = conn.remote_module.which(executable)
276 if not executable_path:
277 raise RuntimeError("Executable '{}' not found on host '{}'".format(
278 executable, conn.hostname))
279 self.log.info("Found executable '{}' at path '{}'".format(executable,
280 executable_path))
281 return executable_path
282
283 def _build_ceph_conf(self):
284 """
285 Build a minimal `ceph.conf` containing the current monitor hosts.
286
287 Notes:
288 - ceph-volume complains if no section header (e.g. global) exists
289 - other ceph cli tools complained about no EOF newline
290
291 TODO:
292 - messenger v2 syntax?
293 """
294 mon_map = self.get("mon_map")
295 mon_addrs = map(lambda m: m["addr"], mon_map["mons"])
296 mon_hosts = ", ".join(mon_addrs)
297 return "[global]\nmon host = {}\n".format(mon_hosts)
298
299 def _ensure_ceph_conf(self, conn, network=False):
300 """
301 Install ceph.conf on remote node if it doesn't exist.
302 """
303 conf = self._build_ceph_conf()
304 if network:
305 conf += "public_network = {}\n".format(network)
306 conn.remote_module.write_conf("/etc/ceph/ceph.conf", conf)
307
308 def _get_bootstrap_key(self, service_type):
309 """
310 Fetch a bootstrap key for a service type.
311
312 :param service_type: name (e.g. mds, osd, mon, ...)
313 """
314 identity_dict = {
315 'admin' : 'client.admin',
316 'mds' : 'client.bootstrap-mds',
317 'mgr' : 'client.bootstrap-mgr',
318 'osd' : 'client.bootstrap-osd',
319 'rgw' : 'client.bootstrap-rgw',
320 'mon' : 'mon.'
321 }
322
323 identity = identity_dict[service_type]
324
325 ret, out, err = self.mon_command({
326 "prefix": "auth get",
327 "entity": identity
328 })
329
330 if ret == -errno.ENOENT:
331 raise RuntimeError("Entity '{}' not found: '{}'".format(identity, err))
332 elif ret != 0:
333 raise RuntimeError("Error retrieving key for '{}' ret {}: '{}'".format(
334 identity, ret, err))
335
336 return out
337
338 def _bootstrap_mgr(self, conn):
339 """
340 Bootstrap a manager.
341
342 1. install a copy of ceph.conf
343 2. install the manager bootstrap key
344
345 :param conn: remote host connection
346 """
347 self._ensure_ceph_conf(conn)
348 keyring = self._get_bootstrap_key("mgr")
349 keyring_path = "/var/lib/ceph/bootstrap-mgr/ceph.keyring"
350 conn.remote_module.write_keyring(keyring_path, keyring)
351 return keyring_path
352
353 def _bootstrap_osd(self, conn):
354 """
355 Bootstrap an osd.
356
357 1. install a copy of ceph.conf
358 2. install the osd bootstrap key
359
360 :param conn: remote host connection
361 """
362 self._ensure_ceph_conf(conn)
363 keyring = self._get_bootstrap_key("osd")
364 keyring_path = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
365 conn.remote_module.write_keyring(keyring_path, keyring)
366 return keyring_path
367
11fdf7f2 368 def _get_hosts(self, wanted=None):
494da23a 369 return self.inventory_cache.items_filtered(wanted)
11fdf7f2
TL
370
371 def add_host(self, host):
372 """
373 Add a host to be managed by the orchestrator.
374
375 :param host: host name
376 """
494da23a 377 @log_exceptions
11fdf7f2 378 def run(host):
494da23a 379 self.inventory_cache[host] = orchestrator.OutdatableData()
11fdf7f2
TL
380 return "Added host '{}'".format(host)
381
382 return SSHWriteCompletion(
383 self._worker_pool.apply_async(run, (host,)))
384
385 def remove_host(self, host):
386 """
387 Remove a host from orchestrator management.
388
389 :param host: host name
390 """
494da23a 391 @log_exceptions
11fdf7f2 392 def run(host):
494da23a 393 del self.inventory_cache[host]
11fdf7f2
TL
394 return "Removed host '{}'".format(host)
395
396 return SSHWriteCompletion(
397 self._worker_pool.apply_async(run, (host,)))
398
399 def get_hosts(self):
400 """
401 Return a list of hosts managed by the orchestrator.
402
403 Notes:
404 - skip async: manager reads from cache.
405
406 TODO:
407 - InventoryNode probably needs to be able to report labels
408 """
494da23a
TL
409 nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
410 return orchestrator.TrivialReadCompletion(nodes)
11fdf7f2
TL
411
412 def _get_device_inventory(self, host):
413 """
414 Query storage devices on a remote node.
415
416 :return: list of InventoryDevice
417 """
418 conn = self._get_connection(host)
419
420 try:
421 ceph_volume_executable = self._executable_path(conn, 'ceph-volume')
422 command = [
423 ceph_volume_executable,
424 "inventory",
425 "--format=json"
426 ]
427
428 out, err, code = remoto.process.check(conn, command)
429 host_devices = json.loads(out[0])
430 return host_devices
431
432 except Exception as ex:
433 self.log.exception(ex)
434 raise
435
436 finally:
437 conn.exit()
438
439 def get_inventory(self, node_filter=None, refresh=False):
440 """
441 Return the storage inventory of nodes matching the given filter.
442
443 :param node_filter: node filter
444
445 TODO:
446 - add filtering by label
447 """
448 if node_filter:
449 hosts = node_filter.nodes
450 self._require_hosts(hosts)
451 hosts = self._get_hosts(hosts)
452 else:
453 # this implies the returned hosts are registered
454 hosts = self._get_hosts()
455
494da23a
TL
456 @log_exceptions
457 def run(host, host_info):
458 # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
11fdf7f2 459
494da23a
TL
460 timeout_min = int(self.get_module_option(
461 "inventory_cache_timeout_min",
462 self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
11fdf7f2 463
494da23a
TL
464 if host_info.outdated(timeout_min) or refresh:
465 self.log.info("refresh stale inventory for '{}'".format(host))
466 data = self._get_device_inventory(host)
467 host_info = orchestrator.OutdatableData(data)
468 self.inventory_cache[host] = host_info
469 else:
470 self.log.debug("reading cached inventory for '{}'".format(host))
11fdf7f2 471
494da23a 472 devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
11fdf7f2
TL
473 return orchestrator.InventoryNode(host, devices)
474
475 results = []
476 for key, host_info in hosts:
477 result = self._worker_pool.apply_async(run, (key, host_info))
478 results.append(result)
479
480 return SSHReadCompletion(results)
481
494da23a 482 @log_exceptions
11fdf7f2
TL
483 def _create_osd(self, host, drive_group):
484 conn = self._get_connection(host)
485 try:
486 devices = drive_group.data_devices.paths
487 self._bootstrap_osd(conn)
488
489 for device in devices:
490 ceph_volume_executable = self._executable_path(conn, "ceph-volume")
491 command = [
492 ceph_volume_executable,
493 "lvm",
494 "create",
495 "--cluster-fsid", self._get_cluster_fsid(),
496 "--{}".format(drive_group.objectstore),
497 "--data", device
498 ]
499 remoto.process.run(conn, command)
500
501 return "Created osd on host '{}'".format(host)
502
503 except:
504 raise
505
506 finally:
507 conn.exit()
508
509 def create_osds(self, drive_group, all_hosts=None):
510 """
511 Create a new osd.
512
513 The orchestrator CLI currently handles a narrow form of drive
514 specification defined by a single block device using bluestore.
515
516 :param drive_group: osd specification
517
518 TODO:
519 - support full drive_group specification
520 - support batch creation
521 """
522 assert len(drive_group.hosts(all_hosts)) == 1
523 assert len(drive_group.data_devices.paths) > 0
524 assert all(map(lambda p: isinstance(p, six.string_types),
525 drive_group.data_devices.paths))
526
527 host = drive_group.hosts(all_hosts)[0]
528 self._require_hosts(host)
529
530 result = self._worker_pool.apply_async(self._create_osd, (host,
531 drive_group))
532
533 return SSHWriteCompletion(result)
534
535 def _create_mon(self, host, network):
536 """
537 Create a new monitor on the given host.
538 """
539 self.log.info("create_mon({}:{}): starting".format(host, network))
540
541 conn = self._get_connection(host)
542
543 try:
544 self._ensure_ceph_conf(conn, network)
545
546 uid = conn.remote_module.path_getuid("/var/lib/ceph")
547 gid = conn.remote_module.path_getgid("/var/lib/ceph")
548
549 # install client admin key on target mon host
550 admin_keyring = self._get_bootstrap_key("admin")
551 admin_keyring_path = '/etc/ceph/ceph.client.admin.keyring'
552 conn.remote_module.write_keyring(admin_keyring_path, admin_keyring, uid, gid)
553
554 mon_path = "/var/lib/ceph/mon/ceph-{name}".format(name=host)
555 conn.remote_module.create_mon_path(mon_path, uid, gid)
556
557 # bootstrap key
558 conn.remote_module.safe_makedirs("/var/lib/ceph/tmp")
559 monitor_keyring = self._get_bootstrap_key("mon")
560 mon_keyring_path = "/var/lib/ceph/tmp/ceph-{name}.mon.keyring".format(name=host)
561 conn.remote_module.write_file(
562 mon_keyring_path,
563 monitor_keyring,
564 0o600,
565 None,
566 uid,
567 gid
568 )
569
570 # monitor map
571 monmap_path = "/var/lib/ceph/tmp/ceph.{name}.monmap".format(name=host)
572 remoto.process.run(conn,
573 ['ceph', 'mon', 'getmap', '-o', monmap_path],
574 )
575
576 user_args = []
577 if uid != 0:
578 user_args = user_args + [ '--setuser', str(uid) ]
579 if gid != 0:
580 user_args = user_args + [ '--setgroup', str(gid) ]
581
582 remoto.process.run(conn,
583 ['ceph-mon', '--mkfs', '-i', host,
584 '--monmap', monmap_path, '--keyring', mon_keyring_path
585 ] + user_args
586 )
587
588 remoto.process.run(conn,
589 ['systemctl', 'enable', 'ceph.target'],
590 timeout=7,
591 )
592
593 remoto.process.run(conn,
594 ['systemctl', 'enable', 'ceph-mon@{name}'.format(name=host)],
595 timeout=7,
596 )
597
598 remoto.process.run(conn,
599 ['systemctl', 'start', 'ceph-mon@{name}'.format(name=host)],
600 timeout=7,
601 )
602
603 return "Created mon on host '{}'".format(host)
604
605 except Exception as e:
606 self.log.error("create_mon({}:{}): error: {}".format(host, network, e))
607 raise
608
609 finally:
610 self.log.info("create_mon({}:{}): finished".format(host, network))
611 conn.exit()
612
613 def update_mons(self, num, hosts):
614 """
615 Adjust the number of cluster monitors.
616 """
617 # current support limited to adding monitors.
618 mon_map = self.get("mon_map")
619 num_mons = len(mon_map["mons"])
620 if num == num_mons:
621 return SSHWriteCompletionReady("The requested number of monitors exist.")
622 if num < num_mons:
623 raise NotImplementedError("Removing monitors is not supported.")
624
625 # check that all the hostnames are registered
626 self._require_hosts(map(lambda h: h[0], hosts))
627
628 # current support requires a network to be specified
629 for host, network in hosts:
630 if not network:
631 raise RuntimeError("Host '{}' missing network "
632 "part".format(host))
633
634 # explicit placement: enough hosts provided?
635 num_new_mons = num - num_mons
636 if len(hosts) < num_new_mons:
637 raise RuntimeError("Error: {} hosts provided, expected {}".format(
638 len(hosts), num_new_mons))
639
640 self.log.info("creating {} monitors on hosts: '{}'".format(
641 num_new_mons, ",".join(map(lambda h: ":".join(h), hosts))))
642
643 # TODO: we may want to chain the creation of the monitors so they join
644 # the quroum one at a time.
645 results = []
646 for host, network in hosts:
647 result = self._worker_pool.apply_async(self._create_mon, (host,
648 network))
649 results.append(result)
650
651 return SSHWriteCompletion(results)
652
653 def _create_mgr(self, host):
654 """
655 Create a new manager instance on a host.
656 """
657 self.log.info("create_mgr({}): starting".format(host))
658
659 conn = self._get_connection(host)
660
661 try:
662 bootstrap_keyring_path = self._bootstrap_mgr(conn)
663
664 mgr_path = "/var/lib/ceph/mgr/ceph-{name}".format(name=host)
665 conn.remote_module.safe_makedirs(mgr_path)
666 keyring_path = os.path.join(mgr_path, "keyring")
667
668 command = [
669 'ceph',
670 '--name', 'client.bootstrap-mgr',
671 '--keyring', bootstrap_keyring_path,
672 'auth', 'get-or-create', 'mgr.{name}'.format(name=host),
673 'mon', 'allow profile mgr',
674 'osd', 'allow *',
675 'mds', 'allow *',
676 '-o',
677 keyring_path
678 ]
679
680 out, err, ret = remoto.process.check(conn, command)
681 if ret != 0:
682 raise Exception("oops")
683
684 remoto.process.run(conn,
685 ['systemctl', 'enable', 'ceph-mgr@{name}'.format(name=host)],
686 timeout=7
687 )
688
689 remoto.process.run(conn,
690 ['systemctl', 'start', 'ceph-mgr@{name}'.format(name=host)],
691 timeout=7
692 )
693
694 remoto.process.run(conn,
695 ['systemctl', 'enable', 'ceph.target'],
696 timeout=7
697 )
698
699 return "Created mgr on host '{}'".format(host)
700
701 except Exception as e:
702 self.log.error("create_mgr({}): error: {}".format(host, e))
703 raise
704
705 finally:
706 self.log.info("create_mgr({}): finished".format(host))
707 conn.exit()
708
709 def update_mgrs(self, num, hosts):
710 """
711 Adjust the number of cluster managers.
712 """
713 # current support limited to adding managers.
714 mgr_map = self.get("mgr_map")
715 num_mgrs = 1 if mgr_map["active_name"] else 0
716 num_mgrs += len(mgr_map["standbys"])
717 if num == num_mgrs:
718 return SSHWriteCompletionReady("The requested number of managers exist.")
719 if num < num_mgrs:
720 raise NotImplementedError("Removing managers is not supported")
721
722 # check that all the hosts are registered
11fdf7f2
TL
723 self._require_hosts(hosts)
724
725 # we assume explicit placement by which there are the same number of
726 # hosts specified as the size of increase in number of daemons.
727 num_new_mgrs = num - num_mgrs
728 if len(hosts) < num_new_mgrs:
729 raise RuntimeError("Error: {} hosts provided, expected {}".format(
730 len(hosts), num_new_mgrs))
731
732 self.log.info("creating {} managers on hosts: '{}'".format(
733 num_new_mgrs, ",".join(hosts)))
734
735 results = []
736 for i in range(num_new_mgrs):
737 result = self._worker_pool.apply_async(self._create_mgr, (hosts[i],))
738 results.append(result)
739
740 return SSHWriteCompletion(results)