]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/module.py
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
CommitLineData
1e59de90 1import asyncio
f38dd50b 2import concurrent
9f95a23c
TL
3import json
4import errno
33c7a0ef 5import ipaddress
9f95a23c 6import logging
f91f0fd5
TL
7import re
8import shlex
e306af50 9from collections import defaultdict
f91f0fd5 10from configparser import ConfigParser
1e59de90 11from contextlib import contextmanager
9f95a23c 12from functools import wraps
20effc67 13from tempfile import TemporaryDirectory, NamedTemporaryFile
f38dd50b 14from urllib.error import HTTPError
e306af50 15from threading import Event
9f95a23c 16
1e59de90
TL
17from cephadm.service_discovery import ServiceDiscovery
18
9f95a23c 19import string
e306af50 20from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
1e59de90
TL
21 Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
22 Awaitable, Iterator
9f95a23c
TL
23
24import datetime
9f95a23c
TL
25import os
26import random
9f95a23c 27import multiprocessing.pool
9f95a23c 28import subprocess
f67539c2 29from prettytable import PrettyTable
9f95a23c 30
e306af50 31from ceph.deployment import inventory
9f95a23c 32from ceph.deployment.drive_group import DriveGroupSpec
801d1391 33from ceph.deployment.service_spec import \
20effc67 34 ServiceSpec, PlacementSpec, \
2a845540 35 HostPlacementSpec, IngressSpec, \
1e59de90 36 TunedProfileSpec, IscsiServiceSpec
adb31ebb 37from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f91f0fd5 38from cephadm.serve import CephadmServe
f67539c2 39from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
1e59de90
TL
40from cephadm.http_server import CephadmHttpServer
41from cephadm.agent import CephadmAgentHelpers
9f95a23c 42
20effc67 43
f38dd50b
TL
44from mgr_module import (
45 MgrModule,
46 HandleCommandResult,
47 Option,
48 NotifyType,
49 MonCommandFailed,
50)
51from mgr_util import build_url
9f95a23c 52import orchestrator
f67539c2
TL
53from orchestrator.module import to_format, Format
54
9f95a23c 55from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
f67539c2
TL
56 CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
57 service_to_daemon_types
e306af50 58from orchestrator._interface import GenericSpec
f67539c2 59from orchestrator._interface import daemon_type_to_service
9f95a23c 60
801d1391 61from . import utils
20effc67 62from . import ssh
f6b5b4d7 63from .migrations import Migrations
e306af50 64from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
39ae355f
TL
65 RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \
66 CephExporterService
f67539c2 67from .services.ingress import IngressService
f91f0fd5 68from .services.container import CustomContainerService
e306af50 69from .services.iscsi import IscsiService
aee94f69 70from .services.nvmeof import NvmeofService
e306af50 71from .services.nfs import NFSService
f67539c2 72from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
e306af50 73from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
33c7a0ef 74 NodeExporterService, SNMPGatewayService, LokiService, PromtailService
1e59de90 75from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
f38dd50b 76from .services.node_proxy import NodeProxy
f91f0fd5 77from .schedule import HostAssignment
20effc67 78from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
f38dd50b 79 ClientKeyringStore, ClientKeyringSpec, TunedProfileStore, NodeProxyCache
f67539c2 80from .upgrade import CephadmUpgrade
e306af50 81from .template import TemplateMgr
33c7a0ef 82from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
aee94f69 83 cephadmNoImage, CEPH_UPGRADE_ORDER, SpecialHostLabels
f67539c2 84from .configchecks import CephadmConfigChecks
33c7a0ef 85from .offline_watcher import OfflineHostWatcher
2a845540 86from .tuned_profiles import TunedProfileUtils
9f95a23c
TL
87
88try:
20effc67 89 import asyncssh
9f95a23c 90except ImportError as e:
20effc67
TL
91 asyncssh = None # type: ignore
92 asyncssh_import_error = str(e)
9f95a23c 93
9f95a23c
TL
94logger = logging.getLogger(__name__)
95
e306af50
TL
96T = TypeVar('T')
97
1911f103
TL
98DEFAULT_SSH_CONFIG = """
99Host *
100 User root
101 StrictHostKeyChecking no
102 UserKnownHostsFile /dev/null
103 ConnectTimeout=30
104"""
9f95a23c 105
20effc67
TL
106# cherrypy likes to sys.exit on error. don't let it take us down too!
107
108
109def os_exit_noop(status: int) -> None:
110 pass
111
112
113os._exit = os_exit_noop # type: ignore
114
115
f67539c2 116# Default container images -----------------------------------------------------
aee94f69 117DEFAULT_IMAGE = 'quay.io/ceph/ceph' # DO NOT ADD TAG TO THIS
1e59de90
TL
118DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
119DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
f38dd50b 120DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:1.0.0'
33c7a0ef
TL
121DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
122DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
1e59de90
TL
123DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
124DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
2a845540 125DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
1e59de90 126DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
20effc67 127DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
1e59de90
TL
128DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
129DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
130DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29'
131DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29'
f67539c2
TL
132# ------------------------------------------------------------------------------
133
134
f67539c2
TL
135def host_exists(hostname_position: int = 1) -> Callable:
136 """Check that a hostname exists in the inventory"""
137 def inner(func: Callable) -> Callable:
138 @wraps(func)
139 def wrapper(*args: Any, **kwargs: Any) -> Any:
140 this = args[0] # self object
141 hostname = args[hostname_position]
142 if hostname not in this.cache.get_hosts():
143 candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
144 help_msg = f"Did you mean {candidates}?" if candidates else ""
145 raise OrchestratorError(
146 f"Cannot find host '{hostname}' in the inventory. {help_msg}")
9f95a23c 147
f67539c2
TL
148 return func(*args, **kwargs)
149 return wrapper
150 return inner
f91f0fd5
TL
151
152
f67539c2
TL
153class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
154 metaclass=CLICommandMeta):
9f95a23c
TL
155
156 _STORE_HOST_PREFIX = "host"
157
158 instance = None
20effc67 159 NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary]
9f95a23c 160 NATIVE_OPTIONS = [] # type: List[Any]
f67539c2
TL
161 MODULE_OPTIONS = [
162 Option(
163 'ssh_config_file',
164 type='str',
165 default=None,
166 desc='customized SSH config file to connect to managed hosts',
167 ),
168 Option(
169 'device_cache_timeout',
170 type='secs',
171 default=30 * 60,
172 desc='seconds to cache device inventory',
173 ),
174 Option(
175 'device_enhanced_scan',
176 type='bool',
177 default=False,
178 desc='Use libstoragemgmt during device scans',
179 ),
aee94f69
TL
180 Option(
181 'inventory_list_all',
182 type='bool',
183 default=False,
184 desc='Whether ceph-volume inventory should report '
185 'more devices (mostly mappers (LVs / mpaths), partitions...)',
186 ),
f67539c2
TL
187 Option(
188 'daemon_cache_timeout',
189 type='secs',
190 default=10 * 60,
191 desc='seconds to cache service (daemon) inventory',
192 ),
193 Option(
194 'facts_cache_timeout',
195 type='secs',
196 default=1 * 60,
197 desc='seconds to cache host facts data',
198 ),
199 Option(
200 'host_check_interval',
201 type='secs',
202 default=10 * 60,
203 desc='how frequently to perform a host check',
204 ),
205 Option(
206 'mode',
207 type='str',
208 enum_allowed=['root', 'cephadm-package'],
209 default='root',
210 desc='mode for remote execution of cephadm',
211 ),
212 Option(
213 'container_image_base',
214 default=DEFAULT_IMAGE,
215 desc='Container image name, without the tag',
216 runtime=True,
217 ),
218 Option(
219 'container_image_prometheus',
220 default=DEFAULT_PROMETHEUS_IMAGE,
221 desc='Prometheus container image',
222 ),
aee94f69
TL
223 Option(
224 'container_image_nvmeof',
225 default=DEFAULT_NVMEOF_IMAGE,
226 desc='Nvme-of container image',
227 ),
f67539c2
TL
228 Option(
229 'container_image_grafana',
230 default=DEFAULT_GRAFANA_IMAGE,
231 desc='Prometheus container image',
232 ),
233 Option(
234 'container_image_alertmanager',
235 default=DEFAULT_ALERT_MANAGER_IMAGE,
236 desc='Prometheus container image',
237 ),
238 Option(
239 'container_image_node_exporter',
240 default=DEFAULT_NODE_EXPORTER_IMAGE,
241 desc='Prometheus container image',
242 ),
33c7a0ef
TL
243 Option(
244 'container_image_loki',
245 default=DEFAULT_LOKI_IMAGE,
246 desc='Loki container image',
247 ),
248 Option(
249 'container_image_promtail',
250 default=DEFAULT_PROMTAIL_IMAGE,
251 desc='Promtail container image',
252 ),
f67539c2
TL
253 Option(
254 'container_image_haproxy',
b3b6e05e 255 default=DEFAULT_HAPROXY_IMAGE,
f67539c2
TL
256 desc='HAproxy container image',
257 ),
258 Option(
259 'container_image_keepalived',
522d829b 260 default=DEFAULT_KEEPALIVED_IMAGE,
f67539c2
TL
261 desc='Keepalived container image',
262 ),
20effc67
TL
263 Option(
264 'container_image_snmp_gateway',
265 default=DEFAULT_SNMP_GATEWAY_IMAGE,
266 desc='SNMP Gateway container image',
267 ),
1e59de90
TL
268 Option(
269 'container_image_elasticsearch',
270 default=DEFAULT_ELASTICSEARCH_IMAGE,
271 desc='elasticsearch container image',
272 ),
273 Option(
274 'container_image_jaeger_agent',
275 default=DEFAULT_JAEGER_AGENT_IMAGE,
276 desc='Jaeger agent container image',
277 ),
278 Option(
279 'container_image_jaeger_collector',
280 default=DEFAULT_JAEGER_COLLECTOR_IMAGE,
281 desc='Jaeger collector container image',
282 ),
283 Option(
284 'container_image_jaeger_query',
285 default=DEFAULT_JAEGER_QUERY_IMAGE,
286 desc='Jaeger query container image',
287 ),
f67539c2
TL
288 Option(
289 'warn_on_stray_hosts',
290 type='bool',
291 default=True,
292 desc='raise a health warning if daemons are detected on a host '
293 'that is not managed by cephadm',
294 ),
295 Option(
296 'warn_on_stray_daemons',
297 type='bool',
298 default=True,
299 desc='raise a health warning if daemons are detected '
300 'that are not managed by cephadm',
301 ),
302 Option(
303 'warn_on_failed_host_check',
304 type='bool',
305 default=True,
306 desc='raise a health warning if the host check fails',
307 ),
308 Option(
309 'log_to_cluster',
310 type='bool',
311 default=True,
312 desc='log to the "cephadm" cluster log channel"',
313 ),
314 Option(
315 'allow_ptrace',
316 type='bool',
317 default=False,
318 desc='allow SYS_PTRACE capability on ceph containers',
319 long_desc='The SYS_PTRACE capability is needed to attach to a '
320 'process with gdb or strace. Enabling this options '
321 'can allow debugging daemons that encounter problems '
322 'at runtime.',
323 ),
324 Option(
325 'container_init',
326 type='bool',
327 default=True,
328 desc='Run podman/docker with `--init`'
329 ),
330 Option(
331 'prometheus_alerts_path',
332 type='str',
333 default='/etc/prometheus/ceph/ceph_default_alerts.yml',
334 desc='location of alerts to include in prometheus deployments',
335 ),
336 Option(
337 'migration_current',
338 type='int',
339 default=None,
340 desc='internal - do not modify',
1e59de90 341 # used to track spec and other data migrations.
f67539c2
TL
342 ),
343 Option(
344 'config_dashboard',
345 type='bool',
346 default=True,
347 desc='manage configs like API endpoints in Dashboard.'
348 ),
349 Option(
350 'manage_etc_ceph_ceph_conf',
351 type='bool',
352 default=False,
353 desc='Manage and own /etc/ceph/ceph.conf on the hosts.',
354 ),
b3b6e05e
TL
355 Option(
356 'manage_etc_ceph_ceph_conf_hosts',
357 type='str',
358 default='*',
359 desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
360 ),
20effc67 361 # not used anymore
f67539c2
TL
362 Option(
363 'registry_url',
364 type='str',
365 default=None,
20effc67 366 desc='Registry url for login purposes. This is not the default registry'
f67539c2
TL
367 ),
368 Option(
369 'registry_username',
370 type='str',
371 default=None,
20effc67 372 desc='Custom repository username. Only used for logging into a registry.'
f67539c2
TL
373 ),
374 Option(
375 'registry_password',
376 type='str',
377 default=None,
20effc67 378 desc='Custom repository password. Only used for logging into a registry.'
f67539c2 379 ),
20effc67 380 ####
a4b75251
TL
381 Option(
382 'registry_insecure',
383 type='bool',
384 default=False,
385 desc='Registry is to be considered insecure (no TLS available). Only for development purposes.'
386 ),
f67539c2
TL
387 Option(
388 'use_repo_digest',
389 type='bool',
390 default=True,
391 desc='Automatically convert image tags to image digest. Make sure all daemons use the same image',
392 ),
393 Option(
394 'config_checks_enabled',
395 type='bool',
396 default=False,
397 desc='Enable or disable the cephadm configuration analysis',
398 ),
399 Option(
400 'default_registry',
401 type='str',
402 default='docker.io',
20effc67
TL
403 desc='Search-registry to which we should normalize unqualified image names. '
404 'This is not the default registry',
f67539c2
TL
405 ),
406 Option(
407 'max_count_per_host',
408 type='int',
409 default=10,
410 desc='max number of daemons per service per host',
411 ),
b3b6e05e
TL
412 Option(
413 'autotune_memory_target_ratio',
414 type='float',
415 default=.7,
416 desc='ratio of total system memory to divide amongst autotuned daemons'
417 ),
418 Option(
419 'autotune_interval',
420 type='secs',
421 default=10 * 60,
422 desc='how frequently to autotune daemon memory'
423 ),
20effc67
TL
424 Option(
425 'use_agent',
426 type='bool',
427 default=False,
428 desc='Use cephadm agent on each host to gather and send metadata'
429 ),
430 Option(
431 'agent_refresh_rate',
432 type='secs',
433 default=20,
434 desc='How often agent on each host will try to gather and send metadata'
435 ),
436 Option(
437 'agent_starting_port',
438 type='int',
439 default=4721,
440 desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
441 ),
442 Option(
443 'agent_down_multiplier',
444 type='float',
445 default=3.0,
446 desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
447 ),
f38dd50b
TL
448 Option(
449 'hw_monitoring',
450 type='bool',
451 default=False,
452 desc='Deploy hw monitoring daemon on every host.'
453 ),
a4b75251
TL
454 Option(
455 'max_osd_draining_count',
456 type='int',
457 default=10,
458 desc='max number of osds that will be drained simultaneously when osds are removed'
459 ),
1e59de90
TL
460 Option(
461 'service_discovery_port',
462 type='int',
463 default=8765,
464 desc='cephadm service discovery port'
465 ),
39ae355f
TL
466 Option(
467 'cgroups_split',
468 type='bool',
469 default=True,
470 desc='Pass --cgroups=split when cephadm creates containers (currently podman only)'
471 ),
472 Option(
473 'log_refresh_metadata',
474 type='bool',
475 default=False,
476 desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
477 ),
1e59de90
TL
478 Option(
479 'secure_monitoring_stack',
480 type='bool',
481 default=False,
482 desc='Enable TLS security for all the monitoring stack daemons'
483 ),
484 Option(
485 'default_cephadm_command_timeout',
f38dd50b 486 type='int',
1e59de90
TL
487 default=15 * 60,
488 desc='Default timeout applied to cephadm commands run directly on '
489 'the host (in seconds)'
490 ),
f38dd50b
TL
491 Option(
492 'oob_default_addr',
493 type='str',
494 default='169.254.1.1',
495 desc="Default address for RedFish API (oob management)."
496 ),
9f95a23c
TL
497 ]
498
adb31ebb 499 def __init__(self, *args: Any, **kwargs: Any):
9f95a23c 500 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
f67539c2 501 self._cluster_fsid: str = self.get('mon_map')['fsid']
f6b5b4d7 502 self.last_monmap: Optional[datetime.datetime] = None
9f95a23c
TL
503
504 # for serve()
505 self.run = True
506 self.event = Event()
507
20effc67
TL
508 self.ssh = ssh.SSHManager(self)
509
9f95a23c
TL
510 if self.get_store('pause'):
511 self.paused = True
512 else:
513 self.paused = False
514
515 # for mypy which does not run the code
516 if TYPE_CHECKING:
517 self.ssh_config_file = None # type: Optional[str]
518 self.device_cache_timeout = 0
519 self.daemon_cache_timeout = 0
adb31ebb 520 self.facts_cache_timeout = 0
9f95a23c 521 self.host_check_interval = 0
f67539c2 522 self.max_count_per_host = 0
9f95a23c
TL
523 self.mode = ''
524 self.container_image_base = ''
e306af50 525 self.container_image_prometheus = ''
aee94f69 526 self.container_image_nvmeof = ''
e306af50
TL
527 self.container_image_grafana = ''
528 self.container_image_alertmanager = ''
529 self.container_image_node_exporter = ''
33c7a0ef
TL
530 self.container_image_loki = ''
531 self.container_image_promtail = ''
f67539c2
TL
532 self.container_image_haproxy = ''
533 self.container_image_keepalived = ''
20effc67 534 self.container_image_snmp_gateway = ''
1e59de90
TL
535 self.container_image_elasticsearch = ''
536 self.container_image_jaeger_agent = ''
537 self.container_image_jaeger_collector = ''
538 self.container_image_jaeger_query = ''
9f95a23c
TL
539 self.warn_on_stray_hosts = True
540 self.warn_on_stray_daemons = True
541 self.warn_on_failed_host_check = True
542 self.allow_ptrace = False
f67539c2 543 self.container_init = True
801d1391 544 self.prometheus_alerts_path = ''
adb31ebb 545 self.migration_current: Optional[int] = None
f6b5b4d7
TL
546 self.config_dashboard = True
547 self.manage_etc_ceph_ceph_conf = True
b3b6e05e 548 self.manage_etc_ceph_ceph_conf_hosts = '*'
f6b5b4d7
TL
549 self.registry_url: Optional[str] = None
550 self.registry_username: Optional[str] = None
551 self.registry_password: Optional[str] = None
a4b75251 552 self.registry_insecure: bool = False
f67539c2 553 self.use_repo_digest = True
f38dd50b 554 self.config_checks_enabled = False
f67539c2 555 self.default_registry = ''
b3b6e05e
TL
556 self.autotune_memory_target_ratio = 0.0
557 self.autotune_interval = 0
20effc67
TL
558 self.ssh_user: Optional[str] = None
559 self._ssh_options: Optional[str] = None
560 self.tkey = NamedTemporaryFile()
561 self.ssh_config_fname: Optional[str] = None
562 self.ssh_config: Optional[str] = None
563 self._temp_files: List = []
564 self.ssh_key: Optional[str] = None
565 self.ssh_pub: Optional[str] = None
aee94f69 566 self.ssh_cert: Optional[str] = None
20effc67
TL
567 self.use_agent = False
568 self.agent_refresh_rate = 0
569 self.agent_down_multiplier = 0.0
570 self.agent_starting_port = 0
f38dd50b 571 self.hw_monitoring = False
1e59de90
TL
572 self.service_discovery_port = 0
573 self.secure_monitoring_stack = False
a4b75251
TL
574 self.apply_spec_fails: List[Tuple[str, str]] = []
575 self.max_osd_draining_count = 10
20effc67 576 self.device_enhanced_scan = False
aee94f69 577 self.inventory_list_all = False
39ae355f
TL
578 self.cgroups_split = True
579 self.log_refresh_metadata = False
1e59de90 580 self.default_cephadm_command_timeout = 0
f38dd50b 581 self.oob_default_addr = ''
9f95a23c 582
20effc67 583 self.notify(NotifyType.mon_map, None)
9f95a23c
TL
584 self.config_notify()
585
586 path = self.get_ceph_option('cephadm_path')
587 try:
f67539c2 588 assert isinstance(path, str)
1e59de90 589 with open(path, 'rb') as f:
9f95a23c
TL
590 self._cephadm = f.read()
591 except (IOError, TypeError) as e:
592 raise RuntimeError("unable to read cephadm at '%s': %s" % (
593 path, str(e)))
594
f67539c2
TL
595 self.cephadm_binary_path = self._get_cephadm_binary_path()
596
9f95a23c
TL
597 self._worker_pool = multiprocessing.pool.ThreadPool(10)
598
20effc67 599 self.ssh._reconfig_ssh()
9f95a23c
TL
600
601 CephadmOrchestrator.instance = self
602
e306af50 603 self.upgrade = CephadmUpgrade(self)
9f95a23c 604
adb31ebb 605 self.health_checks: Dict[str, dict] = {}
9f95a23c 606
e306af50 607 self.inventory = Inventory(self)
9f95a23c
TL
608
609 self.cache = HostCache(self)
610 self.cache.load()
f6b5b4d7 611
f38dd50b
TL
612 self.node_proxy_cache = NodeProxyCache(self)
613 self.node_proxy_cache.load()
614
20effc67
TL
615 self.agent_cache = AgentCache(self)
616 self.agent_cache.load()
617
adb31ebb
TL
618 self.to_remove_osds = OSDRemovalQueue(self)
619 self.to_remove_osds.load_from_store()
9f95a23c
TL
620
621 self.spec_store = SpecStore(self)
622 self.spec_store.load()
623
b3b6e05e
TL
624 self.keys = ClientKeyringStore(self)
625 self.keys.load()
626
2a845540
TL
627 self.tuned_profiles = TunedProfileStore(self)
628 self.tuned_profiles.load()
629
630 self.tuned_profile_utils = TunedProfileUtils(self)
631
9f95a23c
TL
632 # ensure the host lists are in sync
633 for h in self.inventory.keys():
634 if h not in self.cache.daemons:
635 self.cache.prime_empty_host(h)
636 for h in self.cache.get_hosts():
637 if h not in self.inventory:
638 self.cache.rm_host(h)
639
1911f103 640 # in-memory only.
f6b5b4d7 641 self.events = EventStore(self)
1911f103
TL
642 self.offline_hosts: Set[str] = set()
643
f6b5b4d7
TL
644 self.migration = Migrations(self)
645
1e59de90 646 _service_classes: Sequence[Type[CephadmService]] = [
f67539c2
TL
647 OSDService, NFSService, MonService, MgrService, MdsService,
648 RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
33c7a0ef 649 PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
aee94f69 650 IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
1e59de90 651 CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
f38dd50b 652 JaegerQueryService, JaegerAgentService, JaegerCollectorService, NodeProxy
f67539c2
TL
653 ]
654
655 # https://github.com/python/mypy/issues/8993
656 self.cephadm_services: Dict[str, CephadmService] = {
1e59de90 657 cls.TYPE: cls(self) for cls in _service_classes} # type: ignore
f67539c2
TL
658
659 self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
660 self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
39ae355f 661 self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
aee94f69 662 self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
f38dd50b 663 self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy'])
39ae355f
TL
664
665 self.scheduled_async_actions: List[Callable] = []
e306af50 666
f91f0fd5 667 self.template = TemplateMgr(self)
e306af50 668
adb31ebb 669 self.requires_post_actions: Set[str] = set()
522d829b 670 self.need_connect_dashboard_rgw = False
f6b5b4d7 671
f67539c2
TL
672 self.config_checker = CephadmConfigChecks(self)
673
1e59de90
TL
674 self.http_server = CephadmHttpServer(self)
675 self.http_server.start()
f38dd50b
TL
676
677 self.node_proxy = NodeProxy(self)
678
20effc67
TL
679 self.agent_helpers = CephadmAgentHelpers(self)
680 if self.use_agent:
681 self.agent_helpers._apply_agent()
682
33c7a0ef
TL
683 self.offline_watcher = OfflineHostWatcher(self)
684 self.offline_watcher.start()
685
adb31ebb 686 def shutdown(self) -> None:
9f95a23c
TL
687 self.log.debug('shutdown')
688 self._worker_pool.close()
689 self._worker_pool.join()
1e59de90 690 self.http_server.shutdown()
33c7a0ef 691 self.offline_watcher.shutdown()
9f95a23c
TL
692 self.run = False
693 self.event.set()
694
e306af50
TL
695 def _get_cephadm_service(self, service_type: str) -> CephadmService:
696 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
697 return self.cephadm_services[service_type]
698
f67539c2
TL
699 def _get_cephadm_binary_path(self) -> str:
700 import hashlib
701 m = hashlib.sha256()
1e59de90 702 m.update(self._cephadm)
f67539c2
TL
703 return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
704
adb31ebb 705 def _kick_serve_loop(self) -> None:
9f95a23c
TL
706 self.log.debug('_kick_serve_loop')
707 self.event.set()
708
f6b5b4d7
TL
709 def serve(self) -> None:
710 """
711 The main loop of cephadm.
712
713 A command handler will typically change the declarative state
714 of cephadm. This loop will then attempt to apply this new state.
715 """
20effc67
TL
716 # for ssh in serve
717 self.event_loop = ssh.EventLoopThread()
718
f91f0fd5
TL
719 serve = CephadmServe(self)
720 serve.serve()
721
1e59de90
TL
722 def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
723 if not timeout:
724 timeout = self.default_cephadm_command_timeout
725 # put a lower bound of 60 seconds in case users
726 # accidentally set it to something unreasonable.
727 # For example if they though it was in minutes
728 # rather than seconds
729 if timeout < 60:
730 self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
731 timeout = 60
732 return self.event_loop.get_result(coro, timeout)
733
734 @contextmanager
735 def async_timeout_handler(self, host: Optional[str] = '',
736 cmd: Optional[str] = '',
737 timeout: Optional[int] = None) -> Iterator[None]:
738 # this is meant to catch asyncio.TimeoutError and convert it into an
739 # OrchestratorError which much of the cephadm codebase is better equipped to handle.
740 # If the command being run, the host it is run on, or the timeout being used
741 # are provided, that will be included in the OrchestratorError's message
742 try:
743 yield
f38dd50b 744 except (asyncio.TimeoutError, concurrent.futures.TimeoutError):
1e59de90
TL
745 err_str: str = ''
746 if cmd:
747 err_str = f'Command "{cmd}" timed out '
748 else:
749 err_str = 'Command timed out '
750 if host:
751 err_str += f'on host {host} '
752 if timeout:
753 err_str += f'(non-default {timeout} second timeout)'
754 else:
755 err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)')
756 raise OrchestratorError(err_str)
f38dd50b
TL
757 except concurrent.futures.CancelledError as e:
758 err_str = ''
759 if cmd:
760 err_str = f'Command "{cmd}" failed '
761 else:
762 err_str = 'Command failed '
763 if host:
764 err_str += f'on host {host} '
765 err_str += f' - {str(e)}'
766 raise OrchestratorError(err_str)
20effc67 767
adb31ebb 768 def set_container_image(self, entity: str, image: str) -> None:
f91f0fd5
TL
769 self.check_mon_command({
770 'prefix': 'config set',
771 'name': 'container_image',
772 'value': image,
773 'who': entity,
774 })
f6b5b4d7 775
adb31ebb 776 def config_notify(self) -> None:
9f95a23c
TL
777 """
778 This method is called whenever one of our config options is changed.
f6b5b4d7
TL
779
780 TODO: this method should be moved into mgr_module.py
9f95a23c
TL
781 """
782 for opt in self.MODULE_OPTIONS:
783 setattr(self,
784 opt['name'], # type: ignore
785 self.get_module_option(opt['name'])) # type: ignore
786 self.log.debug(' mgr option %s = %s',
787 opt['name'], getattr(self, opt['name'])) # type: ignore
788 for opt in self.NATIVE_OPTIONS:
789 setattr(self,
790 opt, # type: ignore
791 self.get_ceph_option(opt))
792 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
793
794 self.event.set()
795
20effc67
TL
796 def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
797 if notify_type == NotifyType.mon_map:
f6b5b4d7
TL
798 # get monmap mtime so we can refresh configs when mons change
799 monmap = self.get('mon_map')
adb31ebb
TL
800 self.last_monmap = str_to_datetime(monmap['modified'])
801 if self.last_monmap and self.last_monmap > datetime_now():
f6b5b4d7 802 self.last_monmap = None # just in case clocks are skewed
f91f0fd5
TL
803 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
804 # getattr, due to notify() being called before config_notify()
805 self._kick_serve_loop()
20effc67 806 if notify_type == NotifyType.pg_summary:
f6b5b4d7
TL
807 self._trigger_osd_removal()
808
adb31ebb 809 def _trigger_osd_removal(self) -> None:
20effc67
TL
810 remove_queue = self.to_remove_osds.as_osd_ids()
811 if not remove_queue:
812 return
f6b5b4d7
TL
813 data = self.get("osd_stats")
814 for osd in data.get('osd_stats', []):
815 if osd.get('num_pgs') == 0:
816 # if _ANY_ osd that is currently in the queue appears to be empty,
817 # start the removal process
20effc67 818 if int(osd.get('osd')) in remove_queue:
f67539c2 819 self.log.debug('Found empty osd. Starting removal process')
f6b5b4d7
TL
820 # if the osd that is now empty is also part of the removal queue
821 # start the process
adb31ebb 822 self._kick_serve_loop()
9f95a23c 823
adb31ebb 824 def pause(self) -> None:
9f95a23c
TL
825 if not self.paused:
826 self.log.info('Paused')
827 self.set_store('pause', 'true')
828 self.paused = True
829 # wake loop so we update the health status
830 self._kick_serve_loop()
831
adb31ebb 832 def resume(self) -> None:
9f95a23c
TL
833 if self.paused:
834 self.log.info('Resumed')
835 self.paused = False
836 self.set_store('pause', None)
837 # unconditionally wake loop so that 'orch resume' can be used to kick
838 # cephadm
839 self._kick_serve_loop()
840
b3b6e05e
TL
841 def get_unique_name(
842 self,
843 daemon_type: str,
844 host: str,
845 existing: List[orchestrator.DaemonDescription],
846 prefix: Optional[str] = None,
847 forcename: Optional[str] = None,
848 rank: Optional[int] = None,
849 rank_generation: Optional[int] = None,
850 ) -> str:
9f95a23c
TL
851 """
852 Generate a unique random service name
853 """
854 suffix = daemon_type not in [
f38dd50b 855 'mon', 'crash', 'ceph-exporter', 'node-proxy',
9f95a23c 856 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
1e59de90
TL
857 'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
858 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
9f95a23c
TL
859 ]
860 if forcename:
861 if len([d for d in existing if d.daemon_id == forcename]):
f91f0fd5
TL
862 raise orchestrator.OrchestratorValidationError(
863 f'name {daemon_type}.{forcename} already in use')
9f95a23c
TL
864 return forcename
865
866 if '.' in host:
867 host = host.split('.')[0]
868 while True:
869 if prefix:
870 name = prefix + '.'
871 else:
872 name = ''
b3b6e05e
TL
873 if rank is not None and rank_generation is not None:
874 name += f'{rank}.{rank_generation}.'
9f95a23c
TL
875 name += host
876 if suffix:
877 name += '.' + ''.join(random.choice(string.ascii_lowercase)
878 for _ in range(6))
879 if len([d for d in existing if d.daemon_id == name]):
880 if not suffix:
f91f0fd5
TL
881 raise orchestrator.OrchestratorValidationError(
882 f'name {daemon_type}.{name} already in use')
9f95a23c
TL
883 self.log.debug('name %s exists, trying again', name)
884 continue
885 return name
886
adb31ebb 887 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
f91f0fd5
TL
888 if ssh_config is None or len(ssh_config.strip()) == 0:
889 raise OrchestratorValidationError('ssh_config cannot be empty')
890 # StrictHostKeyChecking is [yes|no] ?
f67539c2
TL
891 res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
892 if not res:
f91f0fd5 893 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
f67539c2 894 for s in res:
f91f0fd5
TL
895 if 'ask' in s.lower():
896 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
897
adb31ebb 898 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
801d1391
TL
899 if not os.path.isfile(ssh_config_fname):
900 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
901 ssh_config_fname))
902
20effc67 903 def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
aee94f69
TL
904 def _as_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
905 return str_to_datetime(value) if value is not None else None
906
20effc67
TL
907 dm = {}
908 for d in ls:
909 if not d['style'].startswith('cephadm'):
910 continue
911 if d['fsid'] != self._cluster_fsid:
912 continue
913 if '.' not in d['name']:
914 continue
aee94f69
TL
915 daemon_type = d['name'].split('.')[0]
916 if daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
917 logger.warning(f"Found unknown daemon type {daemon_type} on host {host}")
20effc67 918 continue
9f95a23c 919
aee94f69
TL
920 container_id = d.get('container_id')
921 if container_id:
20effc67 922 # shorten the hash
aee94f69
TL
923 container_id = container_id[0:12]
924 rank = int(d['rank']) if d.get('rank') is not None else None
925 rank_generation = int(d['rank_generation']) if d.get(
20effc67 926 'rank_generation') is not None else None
aee94f69 927 status, status_desc = None, 'unknown'
20effc67 928 if 'state' in d:
aee94f69
TL
929 status_desc = d['state']
930 status = {
20effc67
TL
931 'running': DaemonDescriptionStatus.running,
932 'stopped': DaemonDescriptionStatus.stopped,
933 'error': DaemonDescriptionStatus.error,
934 'unknown': DaemonDescriptionStatus.error,
935 }[d['state']]
aee94f69
TL
936 sd = orchestrator.DaemonDescription(
937 daemon_type=daemon_type,
938 daemon_id='.'.join(d['name'].split('.')[1:]),
939 hostname=host,
940 container_id=container_id,
941 container_image_id=d.get('container_image_id'),
942 container_image_name=d.get('container_image_name'),
943 container_image_digests=d.get('container_image_digests'),
944 version=d.get('version'),
945 status=status,
946 status_desc=status_desc,
947 created=_as_datetime(d.get('created')),
948 started=_as_datetime(d.get('started')),
949 last_refresh=datetime_now(),
950 last_configured=_as_datetime(d.get('last_configured')),
951 last_deployed=_as_datetime(d.get('last_deployed')),
952 memory_usage=d.get('memory_usage'),
953 memory_request=d.get('memory_request'),
954 memory_limit=d.get('memory_limit'),
955 cpu_percentage=d.get('cpu_percentage'),
956 service_name=d.get('service_name'),
957 ports=d.get('ports'),
958 ip=d.get('ip'),
959 deployed_by=d.get('deployed_by'),
960 rank=rank,
961 rank_generation=rank_generation,
962 extra_container_args=d.get('extra_container_args'),
963 extra_entrypoint_args=d.get('extra_entrypoint_args'),
964 )
20effc67
TL
965 dm[sd.name()] = sd
966 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
967 self.cache.update_host_daemons(host, dm)
968 self.cache.save_host(host)
969 return None
9f95a23c 970
33c7a0ef
TL
971 def update_watched_hosts(self) -> None:
972 # currently, we are watching hosts with nfs daemons
973 hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
974 ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
975 self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
976
adb31ebb 977 def offline_hosts_remove(self, host: str) -> None:
1911f103
TL
978 if host in self.offline_hosts:
979 self.offline_hosts.remove(host)
980
20effc67
TL
981 def update_failed_daemon_health_check(self) -> None:
982 failed_daemons = []
983 for dd in self.cache.get_error_daemons():
984 if dd.daemon_type != 'agent': # agents tracked by CEPHADM_AGENT_DOWN
985 failed_daemons.append('daemon %s on %s is in %s state' % (
986 dd.name(), dd.hostname, dd.status_desc
987 ))
988 self.remove_health_warning('CEPHADM_FAILED_DAEMON')
989 if failed_daemons:
990 self.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(
991 failed_daemons), failed_daemons)
992
f38dd50b
TL
993 def get_first_matching_network_ip(self, host: str, sspec: ServiceSpec) -> Optional[str]:
994 sspec_networks = sspec.networks
995 for subnet, ifaces in self.cache.networks.get(host, {}).items():
996 host_network = ipaddress.ip_network(subnet)
997 for spec_network_str in sspec_networks:
998 spec_network = ipaddress.ip_network(spec_network_str)
999 if host_network.overlaps(spec_network):
1000 return list(ifaces.values())[0][0]
1001 logger.error(f'{spec_network} from {sspec.service_name()} spec does not overlap with {host_network} on {host}')
1002 return None
1003
9f95a23c 1004 @staticmethod
adb31ebb 1005 def can_run() -> Tuple[bool, str]:
20effc67 1006 if asyncssh is not None:
9f95a23c
TL
1007 return True, ""
1008 else:
20effc67
TL
1009 return False, "loading asyncssh library:{}".format(
1010 asyncssh_import_error)
9f95a23c 1011
f67539c2 1012 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
9f95a23c
TL
1013 """
1014 The cephadm orchestrator is always available.
1015 """
f6b5b4d7
TL
1016 ok, err = self.can_run()
1017 if not ok:
f67539c2 1018 return ok, err, {}
f6b5b4d7 1019 if not self.ssh_key or not self.ssh_pub:
f67539c2 1020 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
9f95a23c 1021
f67539c2
TL
1022 # mypy is unable to determine type for _processes since it's private
1023 worker_count: int = self._worker_pool._processes # type: ignore
1024 ret = {
1025 "workers": worker_count,
1026 "paused": self.paused,
1027 }
9f95a23c 1028
f67539c2 1029 return True, err, ret
9f95a23c 1030
b3b6e05e
TL
1031 def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
1032 self.set_store(what, new)
20effc67 1033 self.ssh._reconfig_ssh()
b3b6e05e
TL
1034 if self.cache.get_hosts():
1035 # Can't check anything without hosts
1036 host = self.cache.get_hosts()[0]
1037 r = CephadmServe(self)._check_host(host)
1038 if r is not None:
1039 # connection failed reset user
1040 self.set_store(what, old)
20effc67 1041 self.ssh._reconfig_ssh()
b3b6e05e
TL
1042 raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
1043 self.log.info(f'Set ssh {what}')
1044
9f95a23c 1045 @orchestrator._cli_write_command(
f67539c2 1046 prefix='cephadm set-ssh-config')
adb31ebb 1047 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
9f95a23c 1048 """
f67539c2 1049 Set the ssh_config file (use -i <ssh_config>)
9f95a23c 1050 """
f67539c2
TL
1051 # Set an ssh_config file provided from stdin
1052
b3b6e05e
TL
1053 old = self.ssh_config
1054 if inbuf == old:
f6b5b4d7 1055 return 0, "value unchanged", ""
f91f0fd5 1056 self.validate_ssh_config_content(inbuf)
b3b6e05e 1057 self._validate_and_set_ssh_val('ssh_config', inbuf, old)
9f95a23c
TL
1058 return 0, "", ""
1059
f67539c2 1060 @orchestrator._cli_write_command('cephadm clear-ssh-config')
adb31ebb 1061 def _clear_ssh_config(self) -> Tuple[int, str, str]:
9f95a23c 1062 """
f67539c2 1063 Clear the ssh_config file
9f95a23c 1064 """
f67539c2 1065 # Clear the ssh_config file provided from stdin
9f95a23c
TL
1066 self.set_store("ssh_config", None)
1067 self.ssh_config_tmp = None
1068 self.log.info('Cleared ssh_config')
20effc67 1069 self.ssh._reconfig_ssh()
9f95a23c
TL
1070 return 0, "", ""
1071
f67539c2 1072 @orchestrator._cli_read_command('cephadm get-ssh-config')
adb31ebb 1073 def _get_ssh_config(self) -> HandleCommandResult:
f67539c2
TL
1074 """
1075 Returns the ssh config as used by cephadm
1076 """
801d1391
TL
1077 if self.ssh_config_file:
1078 self.validate_ssh_config_fname(self.ssh_config_file)
1079 with open(self.ssh_config_file) as f:
1080 return HandleCommandResult(stdout=f.read())
1081 ssh_config = self.get_store("ssh_config")
1082 if ssh_config:
1083 return HandleCommandResult(stdout=ssh_config)
1084 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
1085
f67539c2 1086 @orchestrator._cli_write_command('cephadm generate-key')
adb31ebb 1087 def _generate_key(self) -> Tuple[int, str, str]:
f67539c2
TL
1088 """
1089 Generate a cluster SSH key (if not present)
1090 """
9f95a23c
TL
1091 if not self.ssh_pub or not self.ssh_key:
1092 self.log.info('Generating ssh key...')
1093 tmp_dir = TemporaryDirectory()
1094 path = tmp_dir.name + '/key'
1095 try:
1911f103 1096 subprocess.check_call([
9f95a23c
TL
1097 '/usr/bin/ssh-keygen',
1098 '-C', 'ceph-%s' % self._cluster_fsid,
1099 '-N', '',
1100 '-f', path
1101 ])
1102 with open(path, 'r') as f:
1103 secret = f.read()
1104 with open(path + '.pub', 'r') as f:
1105 pub = f.read()
1106 finally:
1107 os.unlink(path)
1108 os.unlink(path + '.pub')
1109 tmp_dir.cleanup()
1110 self.set_store('ssh_identity_key', secret)
1111 self.set_store('ssh_identity_pub', pub)
20effc67 1112 self.ssh._reconfig_ssh()
9f95a23c
TL
1113 return 0, '', ''
1114
e306af50 1115 @orchestrator._cli_write_command(
f67539c2 1116 'cephadm set-priv-key')
adb31ebb 1117 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 1118 """Set cluster SSH private key (use -i <private_key>)"""
e306af50
TL
1119 if inbuf is None or len(inbuf) == 0:
1120 return -errno.EINVAL, "", "empty private ssh key provided"
b3b6e05e
TL
1121 old = self.ssh_key
1122 if inbuf == old:
f6b5b4d7 1123 return 0, "value unchanged", ""
b3b6e05e 1124 self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old)
e306af50 1125 self.log.info('Set ssh private key')
e306af50
TL
1126 return 0, "", ""
1127
1128 @orchestrator._cli_write_command(
f67539c2 1129 'cephadm set-pub-key')
adb31ebb 1130 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 1131 """Set cluster SSH public key (use -i <public_key>)"""
e306af50
TL
1132 if inbuf is None or len(inbuf) == 0:
1133 return -errno.EINVAL, "", "empty public ssh key provided"
b3b6e05e
TL
1134 old = self.ssh_pub
1135 if inbuf == old:
f6b5b4d7 1136 return 0, "value unchanged", ""
b3b6e05e 1137 self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
e306af50
TL
1138 return 0, "", ""
1139
aee94f69
TL
1140 @orchestrator._cli_write_command(
1141 'cephadm set-signed-cert')
1142 def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1143 """Set a signed cert if CA signed keys are being used (use -i <cert_filename>)"""
1144 if inbuf is None or len(inbuf) == 0:
1145 return -errno.EINVAL, "", "empty cert file provided"
1146 old = self.ssh_cert
1147 if inbuf == old:
1148 return 0, "value unchanged", ""
1149 self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old)
1150 return 0, "", ""
1151
9f95a23c 1152 @orchestrator._cli_write_command(
f67539c2 1153 'cephadm clear-key')
adb31ebb 1154 def _clear_key(self) -> Tuple[int, str, str]:
f67539c2 1155 """Clear cluster SSH key"""
9f95a23c
TL
1156 self.set_store('ssh_identity_key', None)
1157 self.set_store('ssh_identity_pub', None)
aee94f69 1158 self.set_store('ssh_identity_cert', None)
20effc67 1159 self.ssh._reconfig_ssh()
9f95a23c
TL
1160 self.log.info('Cleared cluster SSH key')
1161 return 0, '', ''
1162
1163 @orchestrator._cli_read_command(
f67539c2 1164 'cephadm get-pub-key')
adb31ebb 1165 def _get_pub_key(self) -> Tuple[int, str, str]:
f67539c2 1166 """Show SSH public key for connecting to cluster hosts"""
9f95a23c
TL
1167 if self.ssh_pub:
1168 return 0, self.ssh_pub, ''
1169 else:
1170 return -errno.ENOENT, '', 'No cluster SSH key defined'
1171
aee94f69
TL
1172 @orchestrator._cli_read_command(
1173 'cephadm get-signed-cert')
1174 def _get_signed_cert(self) -> Tuple[int, str, str]:
1175 """Show SSH signed cert for connecting to cluster hosts using CA signed keys"""
1176 if self.ssh_cert:
1177 return 0, self.ssh_cert, ''
1178 else:
1179 return -errno.ENOENT, '', 'No signed cert defined'
1180
9f95a23c 1181 @orchestrator._cli_read_command(
f67539c2 1182 'cephadm get-user')
adb31ebb 1183 def _get_user(self) -> Tuple[int, str, str]:
f67539c2
TL
1184 """
1185 Show user for SSHing to cluster hosts
1186 """
1187 if self.ssh_user is None:
1188 return -errno.ENOENT, '', 'No cluster SSH user configured'
1189 else:
1190 return 0, self.ssh_user, ''
9f95a23c 1191
f6b5b4d7 1192 @orchestrator._cli_read_command(
f67539c2 1193 'cephadm set-user')
adb31ebb 1194 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
f67539c2
TL
1195 """
1196 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
1197 """
f6b5b4d7
TL
1198 current_user = self.ssh_user
1199 if user == current_user:
1200 return 0, "value unchanged", ""
1201
b3b6e05e 1202 self._validate_and_set_ssh_val('ssh_user', user, current_user)
33c7a0ef
TL
1203 current_ssh_config = self._get_ssh_config()
1204 new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout)
1205 self._set_ssh_config(new_ssh_config)
f6b5b4d7
TL
1206
1207 msg = 'ssh user set to %s' % user
1208 if user != 'root':
b3b6e05e 1209 msg += '. sudo will be used'
f6b5b4d7
TL
1210 self.log.info(msg)
1211 return 0, msg, ''
1212
1213 @orchestrator._cli_read_command(
f67539c2 1214 'cephadm registry-login')
adb31ebb 1215 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2
TL
1216 """
1217 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1218 """
f6b5b4d7
TL
1219 # if password not given in command line, get it through file input
1220 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
1221 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
f91f0fd5 1222 "or -i <login credentials json file>")
20effc67
TL
1223 elif (url and username and password):
1224 registry_json = {'url': url, 'username': username, 'password': password}
1225 else:
adb31ebb 1226 assert isinstance(inbuf, str)
20effc67
TL
1227 registry_json = json.loads(inbuf)
1228 if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json:
f6b5b4d7 1229 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
f91f0fd5
TL
1230 "Please setup json file as\n"
1231 "{\n"
1232 " \"url\": \"REGISTRY_URL\",\n"
1233 " \"username\": \"REGISTRY_USERNAME\",\n"
1234 " \"password\": \"REGISTRY_PASSWORD\"\n"
1235 "}\n")
20effc67 1236
f6b5b4d7
TL
1237 # verify login info works by attempting login on random host
1238 host = None
1239 for host_name in self.inventory.keys():
1240 host = host_name
1241 break
1242 if not host:
1243 raise OrchestratorError('no hosts defined')
1e59de90
TL
1244 with self.async_timeout_handler(host, 'cephadm registry-login'):
1245 r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
f6b5b4d7
TL
1246 if r is not None:
1247 return 1, '', r
1248 # if logins succeeded, store info
1249 self.log.debug("Host logins successful. Storing login info.")
20effc67 1250 self.set_store('registry_credentials', json.dumps(registry_json))
f6b5b4d7
TL
1251 # distribute new login info to all hosts
1252 self.cache.distribute_new_registry_login_info()
1253 return 0, "registry login scheduled", ''
1254
f67539c2 1255 @orchestrator._cli_read_command('cephadm check-host')
adb31ebb 1256 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 1257 """Check whether we can access and manage a remote host"""
f6b5b4d7 1258 try:
1e59de90
TL
1259 with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
1260 out, err, code = self.wait_async(
1261 CephadmServe(self)._run_cephadm(
1262 host, cephadmNoImage, 'check-host', ['--expect-hostname', host],
1263 addr=addr, error_ok=True, no_fsid=True))
f6b5b4d7
TL
1264 if code:
1265 return 1, '', ('check-host failed:\n' + '\n'.join(err))
39ae355f 1266 except ssh.HostConnectionError as e:
aee94f69
TL
1267 self.log.exception(
1268 f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
39ae355f
TL
1269 return 1, '', ('check-host failed:\n'
1270 + f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
f67539c2 1271 except OrchestratorError:
f6b5b4d7 1272 self.log.exception(f"check-host failed for '{host}'")
f67539c2
TL
1273 return 1, '', ('check-host failed:\n'
1274 + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
9f95a23c
TL
1275 # if we have an outstanding health alert for this host, give the
1276 # serve thread a kick
1277 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1278 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1279 if item.startswith('host %s ' % host):
1280 self.event.set()
adb31ebb 1281 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
9f95a23c
TL
1282
1283 @orchestrator._cli_read_command(
f67539c2 1284 'cephadm prepare-host')
adb31ebb 1285 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 1286 """Prepare a remote host for use with cephadm"""
1e59de90
TL
1287 with self.async_timeout_handler(host, 'cephadm prepare-host'):
1288 out, err, code = self.wait_async(
1289 CephadmServe(self)._run_cephadm(
1290 host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host],
1291 addr=addr, error_ok=True, no_fsid=True))
9f95a23c
TL
1292 if code:
1293 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
1294 # if we have an outstanding health alert for this host, give the
1295 # serve thread a kick
1296 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1297 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1298 if item.startswith('host %s ' % host):
1299 self.event.set()
adb31ebb 1300 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
9f95a23c 1301
f91f0fd5 1302 @orchestrator._cli_write_command(
f67539c2 1303 prefix='cephadm set-extra-ceph-conf')
adb31ebb 1304 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
f67539c2
TL
1305 """
1306 Text that is appended to all daemon's ceph.conf.
1307 Mainly a workaround, till `config generate-minimal-conf` generates
1308 a complete ceph.conf.
1309
1310 Warning: this is a dangerous operation.
1311 """
f91f0fd5
TL
1312 if inbuf:
1313 # sanity check.
1314 cp = ConfigParser()
1315 cp.read_string(inbuf, source='<infile>')
1316
1317 self.set_store("extra_ceph_conf", json.dumps({
1318 'conf': inbuf,
adb31ebb 1319 'last_modified': datetime_to_str(datetime_now())
f91f0fd5
TL
1320 }))
1321 self.log.info('Set extra_ceph_conf')
1322 self._kick_serve_loop()
1323 return HandleCommandResult()
1324
1325 @orchestrator._cli_read_command(
f67539c2 1326 'cephadm get-extra-ceph-conf')
f91f0fd5 1327 def _get_extra_ceph_conf(self) -> HandleCommandResult:
f67539c2
TL
1328 """
1329 Get extra ceph conf that is appended
1330 """
f91f0fd5
TL
1331 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
1332
f67539c2
TL
1333 @orchestrator._cli_read_command('cephadm config-check ls')
1334 def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
1335 """List the available configuration checks and their current state"""
1336
1337 if format not in [Format.plain, Format.json, Format.json_pretty]:
1338 return HandleCommandResult(
1339 retval=1,
1340 stderr="Requested format is not supported when listing configuration checks"
1341 )
1342
1343 if format in [Format.json, Format.json_pretty]:
1344 return HandleCommandResult(
1345 stdout=to_format(self.config_checker.health_checks,
1346 format,
1347 many=True,
1348 cls=None))
1349
1350 # plain formatting
1351 table = PrettyTable(
1352 ['NAME',
1353 'HEALTHCHECK',
1354 'STATUS',
1355 'DESCRIPTION'
1356 ], border=False)
1357 table.align['NAME'] = 'l'
1358 table.align['HEALTHCHECK'] = 'l'
1359 table.align['STATUS'] = 'l'
1360 table.align['DESCRIPTION'] = 'l'
1361 table.left_padding_width = 0
1362 table.right_padding_width = 2
1363 for c in self.config_checker.health_checks:
1364 table.add_row((
1365 c.name,
1366 c.healthcheck_name,
1367 c.status,
1368 c.description,
1369 ))
1370
1371 return HandleCommandResult(stdout=table.get_string())
1372
1373 @orchestrator._cli_read_command('cephadm config-check status')
1374 def _config_check_status(self) -> HandleCommandResult:
1375 """Show whether the configuration checker feature is enabled/disabled"""
f38dd50b 1376 status = self.config_checks_enabled
f67539c2
TL
1377 return HandleCommandResult(stdout="Enabled" if status else "Disabled")
1378
1379 @orchestrator._cli_write_command('cephadm config-check enable')
1380 def _config_check_enable(self, check_name: str) -> HandleCommandResult:
1381 """Enable a specific configuration check"""
1382 if not self._config_check_valid(check_name):
1383 return HandleCommandResult(retval=1, stderr="Invalid check name")
1384
1385 err, msg = self._update_config_check(check_name, 'enabled')
1386 if err:
1387 return HandleCommandResult(
1388 retval=err,
1389 stderr=f"Failed to enable check '{check_name}' : {msg}")
1390
1391 return HandleCommandResult(stdout="ok")
1392
1393 @orchestrator._cli_write_command('cephadm config-check disable')
1394 def _config_check_disable(self, check_name: str) -> HandleCommandResult:
1395 """Disable a specific configuration check"""
1396 if not self._config_check_valid(check_name):
1397 return HandleCommandResult(retval=1, stderr="Invalid check name")
1398
1399 err, msg = self._update_config_check(check_name, 'disabled')
1400 if err:
1401 return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}")
1402 else:
1403 # drop any outstanding raised healthcheck for this check
1404 config_check = self.config_checker.lookup_check(check_name)
1405 if config_check:
1406 if config_check.healthcheck_name in self.health_checks:
1407 self.health_checks.pop(config_check.healthcheck_name, None)
1408 self.set_health_checks(self.health_checks)
1409 else:
1410 self.log.error(
1411 f"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1412
1413 return HandleCommandResult(stdout="ok")
1414
1415 def _config_check_valid(self, check_name: str) -> bool:
1416 return check_name in [chk.name for chk in self.config_checker.health_checks]
1417
1418 def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]:
1419 checks_raw = self.get_store('config_checks')
1420 if not checks_raw:
1421 return 1, "config_checks setting is not available"
1422
1423 checks = json.loads(checks_raw)
1424 checks.update({
1425 check_name: status
1426 })
1427 self.log.info(f"updated config check '{check_name}' : {status}")
1428 self.set_store('config_checks', json.dumps(checks))
1429 return 0, ""
1430
f91f0fd5
TL
1431 class ExtraCephConf(NamedTuple):
1432 conf: str
1433 last_modified: Optional[datetime.datetime]
1434
1435 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
1436 data = self.get_store('extra_ceph_conf')
1437 if not data:
1438 return CephadmOrchestrator.ExtraCephConf('', None)
1439 try:
1440 j = json.loads(data)
1441 except ValueError:
adb31ebb
TL
1442 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
1443 self.log.exception('%s: \'%s\'', msg, data)
f91f0fd5
TL
1444 return CephadmOrchestrator.ExtraCephConf('', None)
1445 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
1446
1447 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
1448 conf = self.extra_ceph_conf()
1449 if not conf.last_modified:
1450 return False
1451 return conf.last_modified > dt
1452
f67539c2
TL
1453 @orchestrator._cli_write_command(
1454 'cephadm osd activate'
1455 )
1456 def _osd_activate(self, host: List[str]) -> HandleCommandResult:
1457 """
1458 Start OSD containers for existing OSDs
1459 """
1460
1461 @forall_hosts
1462 def run(h: str) -> str:
1e59de90
TL
1463 with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'):
1464 return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
f67539c2
TL
1465
1466 return HandleCommandResult(stdout='\n'.join(run(host)))
1467
b3b6e05e
TL
1468 @orchestrator._cli_read_command('orch client-keyring ls')
1469 def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
522d829b
TL
1470 """
1471 List client keyrings under cephadm management
1472 """
b3b6e05e
TL
1473 if format != Format.plain:
1474 output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec)
1475 else:
1476 table = PrettyTable(
1477 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1478 border=False)
1479 table.align = 'l'
1480 table.left_padding_width = 0
1481 table.right_padding_width = 2
1482 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity):
1483 table.add_row((
1484 ks.entity, ks.placement.pretty_str(),
1485 utils.file_mode_to_str(ks.mode),
1486 f'{ks.uid}:{ks.gid}',
1487 ks.path,
1488 ))
1489 output = table.get_string()
1490 return HandleCommandResult(stdout=output)
1491
1492 @orchestrator._cli_write_command('orch client-keyring set')
1493 def _client_keyring_set(
1494 self,
1495 entity: str,
1496 placement: str,
1497 owner: Optional[str] = None,
1498 mode: Optional[str] = None,
1499 ) -> HandleCommandResult:
522d829b
TL
1500 """
1501 Add or update client keyring under cephadm management
1502 """
b3b6e05e
TL
1503 if not entity.startswith('client.'):
1504 raise OrchestratorError('entity must start with client.')
1505 if owner:
1506 try:
1507 uid, gid = map(int, owner.split(':'))
1508 except Exception:
1509 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1510 else:
1511 uid = 0
1512 gid = 0
1513 if mode:
1514 try:
1515 imode = int(mode, 8)
1516 except Exception:
1517 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1518 else:
1519 imode = 0o600
1520 pspec = PlacementSpec.from_string(placement)
1521 ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid)
1522 self.keys.update(ks)
1523 self._kick_serve_loop()
1524 return HandleCommandResult()
1525
1526 @orchestrator._cli_write_command('orch client-keyring rm')
1527 def _client_keyring_rm(
1528 self,
1529 entity: str,
1530 ) -> HandleCommandResult:
522d829b
TL
1531 """
1532 Remove client keyring from cephadm management
1533 """
b3b6e05e
TL
1534 self.keys.rm(entity)
1535 self._kick_serve_loop()
1536 return HandleCommandResult()
1537
f91f0fd5 1538 def _get_container_image(self, daemon_name: str) -> Optional[str]:
f6b5b4d7 1539 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
f67539c2 1540 image: Optional[str] = None
a4b75251 1541 if daemon_type in CEPH_IMAGE_TYPES:
f6b5b4d7 1542 # get container image
f67539c2
TL
1543 image = str(self.get_foreign_ceph_option(
1544 utils.name_to_config_section(daemon_name),
1545 'container_image'
1546 )).strip()
f6b5b4d7
TL
1547 elif daemon_type == 'prometheus':
1548 image = self.container_image_prometheus
aee94f69
TL
1549 elif daemon_type == 'nvmeof':
1550 image = self.container_image_nvmeof
f6b5b4d7
TL
1551 elif daemon_type == 'grafana':
1552 image = self.container_image_grafana
1553 elif daemon_type == 'alertmanager':
1554 image = self.container_image_alertmanager
1555 elif daemon_type == 'node-exporter':
1556 image = self.container_image_node_exporter
33c7a0ef
TL
1557 elif daemon_type == 'loki':
1558 image = self.container_image_loki
1559 elif daemon_type == 'promtail':
1560 image = self.container_image_promtail
f67539c2
TL
1561 elif daemon_type == 'haproxy':
1562 image = self.container_image_haproxy
1563 elif daemon_type == 'keepalived':
1564 image = self.container_image_keepalived
1e59de90
TL
1565 elif daemon_type == 'elasticsearch':
1566 image = self.container_image_elasticsearch
1567 elif daemon_type == 'jaeger-agent':
1568 image = self.container_image_jaeger_agent
1569 elif daemon_type == 'jaeger-collector':
1570 image = self.container_image_jaeger_collector
1571 elif daemon_type == 'jaeger-query':
1572 image = self.container_image_jaeger_query
f91f0fd5
TL
1573 elif daemon_type == CustomContainerService.TYPE:
1574 # The image can't be resolved, the necessary information
1575 # is only available when a container is deployed (given
1576 # via spec).
1577 image = None
20effc67
TL
1578 elif daemon_type == 'snmp-gateway':
1579 image = self.container_image_snmp_gateway
f6b5b4d7
TL
1580 else:
1581 assert False, daemon_type
1582
1583 self.log.debug('%s container image %s' % (daemon_name, image))
1584
1585 return image
1586
b3b6e05e
TL
1587 def _check_valid_addr(self, host: str, addr: str) -> str:
1588 # make sure hostname is resolvable before trying to make a connection
1589 try:
1590 ip_addr = utils.resolve_ip(addr)
1591 except OrchestratorError as e:
1592 msg = str(e) + f'''
1593You may need to supply an address for {addr}
1594
1595Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1596To add the cephadm SSH key to the host:
1597> ceph cephadm get-pub-key > ~/ceph.pub
1598> ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1599
1600To check that the host is reachable open a new shell with the --no-hosts flag:
1601> cephadm shell --no-hosts
1602
1603Then run the following:
1604> ceph cephadm get-ssh-config > ssh_config
1605> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1606> chmod 0600 ~/cephadm_private_key
1607> ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1608 raise OrchestratorError(msg)
1609
33c7a0ef
TL
1610 if ipaddress.ip_address(ip_addr).is_loopback and host == addr:
1611 # if this is a re-add, use old address. otherwise error
1612 if host not in self.inventory or self.inventory.get_addr(host) == host:
1613 raise OrchestratorError(
1614 (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1615 + f'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1616 self.log.debug(
1617 f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1618 ip_addr = self.inventory.get_addr(host)
1e59de90
TL
1619 try:
1620 with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
1621 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
1622 host, cephadmNoImage, 'check-host',
1623 ['--expect-hostname', host],
1624 addr=addr,
1625 error_ok=True, no_fsid=True))
1626 if code:
1627 msg = 'check-host failed:\n' + '\n'.join(err)
1628 # err will contain stdout and stderr, so we filter on the message text to
1629 # only show the errors
1630 errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
1631 if errors:
1632 msg = f'Host {host} ({addr}) failed check(s): {errors}'
1633 raise OrchestratorError(msg)
1634 except ssh.HostConnectionError as e:
1635 raise OrchestratorError(str(e))
b3b6e05e
TL
1636 return ip_addr
1637
e306af50 1638 def _add_host(self, spec):
9f95a23c
TL
1639 # type: (HostSpec) -> str
1640 """
1641 Add a host to be managed by the orchestrator.
1642
1643 :param host: host name
1644 """
33c7a0ef 1645 HostSpec.validate(spec)
b3b6e05e
TL
1646 ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
1647 if spec.addr == spec.hostname and ip_addr:
1648 spec.addr = ip_addr
9f95a23c 1649
a4b75251
TL
1650 if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr:
1651 self.cache.refresh_all_host_info(spec.hostname)
1652
f38dd50b
TL
1653 if spec.oob:
1654 if not spec.oob.get('addr'):
1655 spec.oob['addr'] = self.oob_default_addr
1656 if not spec.oob.get('port'):
1657 spec.oob['port'] = '443'
1658 host_oob_info = dict()
1659 host_oob_info['addr'] = spec.oob['addr']
1660 host_oob_info['port'] = spec.oob['port']
1661 host_oob_info['username'] = spec.oob['username']
1662 host_oob_info['password'] = spec.oob['password']
1663 self.node_proxy_cache.update_oob(spec.hostname, host_oob_info)
1664
b3b6e05e
TL
1665 # prime crush map?
1666 if spec.location:
1667 self.check_mon_command({
1668 'prefix': 'osd crush add-bucket',
1669 'name': spec.hostname,
1670 'type': 'host',
1671 'args': [f'{k}={v}' for k, v in spec.location.items()],
1672 })
1673
1674 if spec.hostname not in self.inventory:
1675 self.cache.prime_empty_host(spec.hostname)
e306af50 1676 self.inventory.add_host(spec)
1911f103 1677 self.offline_hosts_remove(spec.hostname)
a4b75251
TL
1678 if spec.status == 'maintenance':
1679 self._set_maintenance_healthcheck()
9f95a23c
TL
1680 self.event.set() # refresh stray health check
1681 self.log.info('Added host %s' % spec.hostname)
b3b6e05e 1682 return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
9f95a23c 1683
f67539c2 1684 @handle_orch_error
e306af50
TL
1685 def add_host(self, spec: HostSpec) -> str:
1686 return self._add_host(spec)
1687
f67539c2 1688 @handle_orch_error
f38dd50b
TL
1689 def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]:
1690 try:
1691 result = self.node_proxy.led(light_type=light_type,
1692 action=action,
1693 hostname=hostname,
1694 device=device)
1695 except RuntimeError as e:
1696 self.log.error(e)
1697 raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
1698 except HTTPError as e:
1699 self.log.error(e)
1700 raise OrchestratorValidationError(f"http error while querying node-proxy API: {e}")
1701 return result
1702
1703 @handle_orch_error
1704 def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> str:
1705 if not yes_i_really_mean_it:
1706 raise OrchestratorError("you must pass --yes-i-really-mean-it")
1707
1708 try:
1709 self.node_proxy.shutdown(hostname, force)
1710 except RuntimeError as e:
1711 self.log.error(e)
1712 raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
1713 except HTTPError as e:
1714 self.log.error(e)
1715 raise OrchestratorValidationError(f"Can't shutdown node {hostname}: {e}")
1716 return f'Shutdown scheduled on {hostname}'
1717
1718 @handle_orch_error
1719 def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> str:
1720 if not yes_i_really_mean_it:
1721 raise OrchestratorError("you must pass --yes-i-really-mean-it")
1722
1723 try:
1724 self.node_proxy.powercycle(hostname)
1725 except RuntimeError as e:
1726 self.log.error(e)
1727 raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
1728 except HTTPError as e:
1729 self.log.error(e)
1730 raise OrchestratorValidationError(f"Can't perform powercycle on node {hostname}: {e}")
1731 return f'Powercycle scheduled on {hostname}'
1732
1733 @handle_orch_error
1734 def node_proxy_fullreport(self, hostname: Optional[str] = None) -> Dict[str, Any]:
1735 return self.node_proxy_cache.fullreport(hostname=hostname)
1736
1737 @handle_orch_error
1738 def node_proxy_summary(self, hostname: Optional[str] = None) -> Dict[str, Any]:
1739 return self.node_proxy_cache.summary(hostname=hostname)
1740
1741 @handle_orch_error
1742 def node_proxy_firmwares(self, hostname: Optional[str] = None) -> Dict[str, Any]:
1743 return self.node_proxy_cache.firmwares(hostname=hostname)
1744
1745 @handle_orch_error
1746 def node_proxy_criticals(self, hostname: Optional[str] = None) -> Dict[str, Any]:
1747 return self.node_proxy_cache.criticals(hostname=hostname)
1748
1749 @handle_orch_error
1750 def node_proxy_common(self, category: str, hostname: Optional[str] = None) -> Dict[str, Any]:
1751 return self.node_proxy_cache.common(category, hostname=hostname)
1752
1753 @handle_orch_error
1754 def remove_host(self, host: str, force: bool = False, offline: bool = False, rm_crush_entry: bool = False) -> str:
9f95a23c
TL
1755 """
1756 Remove a host from orchestrator management.
1757
1758 :param host: host name
522d829b
TL
1759 :param force: bypass running daemons check
1760 :param offline: remove offline host
9f95a23c 1761 """
522d829b
TL
1762
1763 # check if host is offline
1764 host_offline = host in self.offline_hosts
1765
1766 if host_offline and not offline:
20effc67
TL
1767 raise OrchestratorValidationError(
1768 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host))
522d829b
TL
1769
1770 if not host_offline and offline:
20effc67
TL
1771 raise OrchestratorValidationError(
1772 "{} is online, please remove host without --offline.".format(host))
522d829b
TL
1773
1774 if offline and not force:
20effc67 1775 raise OrchestratorValidationError("Removing an offline host requires --force")
522d829b
TL
1776
1777 # check if there are daemons on the host
1778 if not force:
1779 daemons = self.cache.get_daemons_by_host(host)
1780 if daemons:
1781 self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}")
1782
1783 daemons_table = ""
1784 daemons_table += "{:<20} {:<15}\n".format("type", "id")
1785 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1786 for d in daemons:
1787 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
1788
20effc67
TL
1789 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1790 "The following daemons are running in the host:"
1791 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1792 host, daemons_table, host))
1793
1794 # check, if there we're removing the last _admin host
1795 if not force:
aee94f69 1796 p = PlacementSpec(label=SpecialHostLabels.ADMIN)
20effc67
TL
1797 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1798 if len(admin_hosts) == 1 and admin_hosts[0] == host:
aee94f69
TL
1799 raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
1800 f" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host"
20effc67 1801 " or add --force to this command")
522d829b
TL
1802
1803 def run_cmd(cmd_args: dict) -> None:
1804 ret, out, err = self.mon_command(cmd_args)
1805 if ret != 0:
1806 self.log.debug(f"ran {cmd_args} with mon_command")
1807 self.log.error(
1808 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1809 self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
1810
1811 if offline:
1812 daemons = self.cache.get_daemons_by_host(host)
1813 for d in daemons:
1814 self.log.info(f"removing: {d.name()}")
1815
1816 if d.daemon_type != 'osd':
39ae355f 1817 self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
aee94f69
TL
1818 self.cephadm_services[daemon_type_to_service(
1819 str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
522d829b
TL
1820 else:
1821 cmd_args = {
1822 'prefix': 'osd purge-actual',
1823 'id': int(str(d.daemon_id)),
1824 'yes_i_really_mean_it': True
1825 }
1826 run_cmd(cmd_args)
1827
1828 cmd_args = {
1829 'prefix': 'osd crush rm',
1830 'name': host
1831 }
1832 run_cmd(cmd_args)
1833
f38dd50b
TL
1834 if rm_crush_entry:
1835 try:
1836 self.check_mon_command({
1837 'prefix': 'osd crush remove',
1838 'name': host,
1839 })
1840 except MonCommandFailed as e:
1841 self.log.error(f'Couldn\'t remove host {host} from CRUSH map: {str(e)}')
1842 return (f'Cephadm failed removing host {host}\n'
1843 f'Failed to remove host {host} from the CRUSH map: {str(e)}')
1844
e306af50 1845 self.inventory.rm_host(host)
9f95a23c 1846 self.cache.rm_host(host)
20effc67 1847 self.ssh.reset_con(host)
aee94f69
TL
1848 # if host was in offline host list, we should remove it now.
1849 self.offline_hosts_remove(host)
9f95a23c
TL
1850 self.event.set() # refresh stray health check
1851 self.log.info('Removed host %s' % host)
522d829b 1852 return "Removed {} host '{}'".format('offline' if offline else '', host)
9f95a23c 1853
f67539c2 1854 @handle_orch_error
adb31ebb 1855 def update_host_addr(self, host: str, addr: str) -> str:
b3b6e05e 1856 self._check_valid_addr(host, addr)
e306af50 1857 self.inventory.set_addr(host, addr)
20effc67 1858 self.ssh.reset_con(host)
9f95a23c
TL
1859 self.event.set() # refresh stray health check
1860 self.log.info('Set host %s addr to %s' % (host, addr))
1861 return "Updated host '{}' addr to '{}'".format(host, addr)
1862
f67539c2 1863 @handle_orch_error
9f95a23c
TL
1864 def get_hosts(self):
1865 # type: () -> List[orchestrator.HostSpec]
1866 """
1867 Return a list of hosts managed by the orchestrator.
1868
1869 Notes:
1870 - skip async: manager reads from cache.
1871 """
e306af50 1872 return list(self.inventory.all_specs())
9f95a23c 1873
a4b75251
TL
1874 @handle_orch_error
1875 def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]:
1876 """
1877 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1878
1879 Notes:
1880 - skip async: manager reads from cache.
1881 """
1882 if hostname:
1883 return [self.cache.get_facts(hostname)]
1884
1885 return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()]
1886
f67539c2 1887 @handle_orch_error
adb31ebb 1888 def add_host_label(self, host: str, label: str) -> str:
e306af50 1889 self.inventory.add_label(host, label)
9f95a23c 1890 self.log.info('Added label %s to host %s' % (label, host))
b3b6e05e 1891 self._kick_serve_loop()
9f95a23c
TL
1892 return 'Added label %s to host %s' % (label, host)
1893
f67539c2 1894 @handle_orch_error
33c7a0ef
TL
1895 def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
1896 # if we remove the _admin label from the only host that has it we could end up
1897 # removing the only instance of the config and keyring and cause issues
aee94f69
TL
1898 if not force and label == SpecialHostLabels.ADMIN:
1899 p = PlacementSpec(label=SpecialHostLabels.ADMIN)
33c7a0ef
TL
1900 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1901 if len(admin_hosts) == 1 and admin_hosts[0] == host:
aee94f69
TL
1902 raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
1903 f" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal"
33c7a0ef 1904 " of the last cluster config/keyring managed by cephadm.\n"
aee94f69 1905 f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
33c7a0ef
TL
1906 " before completing this operation.\nIf you're certain this is"
1907 " what you want rerun this command with --force.")
aee94f69
TL
1908 if self.inventory.has_label(host, label):
1909 self.inventory.rm_label(host, label)
1910 msg = f'Removed label {label} from host {host}'
1911 else:
1912 msg = f"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels."
1913 self.log.info(msg)
b3b6e05e 1914 self._kick_serve_loop()
aee94f69 1915 return msg
9f95a23c 1916
f67539c2
TL
1917 def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
1918 self.log.debug("running host-ok-to-stop checks")
f6b5b4d7 1919 daemons = self.cache.get_daemons()
f67539c2 1920 daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
f6b5b4d7 1921 for dd in daemons:
f67539c2
TL
1922 assert dd.hostname is not None
1923 assert dd.daemon_type is not None
1924 assert dd.daemon_id is not None
f6b5b4d7
TL
1925 if dd.hostname == hostname:
1926 daemon_map[dd.daemon_type].append(dd.daemon_id)
1927
f67539c2
TL
1928 notifications: List[str] = []
1929 error_notifications: List[str] = []
1930 okay: bool = True
f91f0fd5 1931 for daemon_type, daemon_ids in daemon_map.items():
f67539c2
TL
1932 r = self.cephadm_services[daemon_type_to_service(
1933 daemon_type)].ok_to_stop(daemon_ids, force=force)
f6b5b4d7 1934 if r.retval:
f67539c2
TL
1935 okay = False
1936 # collect error notifications so user can see every daemon causing host
1937 # to not be okay to stop
1938 error_notifications.append(r.stderr)
1939 if r.stdout:
1940 # if extra notifications to print for user, add them to notifications list
1941 notifications.append(r.stdout)
1942
1943 if not okay:
1944 # at least one daemon is not okay to stop
1945 return 1, '\n'.join(error_notifications)
1946
1947 if notifications:
1948 return 0, (f'It is presumed safe to stop host {hostname}. '
1949 + 'Note the following:\n\n' + '\n'.join(notifications))
1950 return 0, f'It is presumed safe to stop host {hostname}'
1951
1952 @handle_orch_error
1953 def host_ok_to_stop(self, hostname: str) -> str:
1954 if hostname not in self.cache.get_hosts():
1955 raise OrchestratorError(f'Cannot find host "{hostname}"')
1956
1957 rc, msg = self._host_ok_to_stop(hostname)
1958 if rc:
1959 raise OrchestratorError(msg, errno=rc)
f6b5b4d7 1960
f6b5b4d7
TL
1961 self.log.info(msg)
1962 return msg
1963
f67539c2
TL
1964 def _set_maintenance_healthcheck(self) -> None:
1965 """Raise/update or clear the maintenance health check as needed"""
1966
1967 in_maintenance = self.inventory.get_host_with_state("maintenance")
1968 if not in_maintenance:
a4b75251 1969 self.remove_health_warning('HOST_IN_MAINTENANCE')
f67539c2
TL
1970 else:
1971 s = "host is" if len(in_maintenance) == 1 else "hosts are"
a4b75251
TL
1972 self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [
1973 f"{h} is in maintenance" for h in in_maintenance])
f67539c2
TL
1974
1975 @handle_orch_error
1976 @host_exists()
1e59de90 1977 def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> str:
f67539c2
TL
1978 """ Attempt to place a cluster host in maintenance
1979
1980 Placing a host into maintenance disables the cluster's ceph target in systemd
1981 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1982 for the host subtree in crush to prevent data movement during a host maintenance
1983 window.
1984
1985 :param hostname: (str) name of the host (must match an inventory hostname)
1986
1987 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1988 """
1e59de90
TL
1989 if yes_i_really_mean_it and not force:
1990 raise OrchestratorError("--force must be passed with --yes-i-really-mean-it")
1991
1992 if len(self.cache.get_hosts()) == 1 and not yes_i_really_mean_it:
f67539c2
TL
1993 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1994
1995 # if upgrade is active, deny
1e59de90 1996 if self.upgrade.upgrade_state and not yes_i_really_mean_it:
f67539c2
TL
1997 raise OrchestratorError(
1998 f"Unable to place {hostname} in maintenance with upgrade active/paused")
1999
2000 tgt_host = self.inventory._inventory[hostname]
2001 if tgt_host.get("status", "").lower() == "maintenance":
2002 raise OrchestratorError(f"Host {hostname} is already in maintenance")
2003
2004 host_daemons = self.cache.get_daemon_types(hostname)
2005 self.log.debug("daemons on host {}".format(','.join(host_daemons)))
2006 if host_daemons:
2007 # daemons on this host, so check the daemons can be stopped
2008 # and if so, place the host into maintenance by disabling the target
2009 rc, msg = self._host_ok_to_stop(hostname, force)
1e59de90 2010 if rc and not yes_i_really_mean_it:
f67539c2
TL
2011 raise OrchestratorError(
2012 msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
2013
2014 # call the host-maintenance function
1e59de90
TL
2015 with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'):
2016 _out, _err, _code = self.wait_async(
2017 CephadmServe(self)._run_cephadm(
2018 hostname, cephadmNoImage, "host-maintenance",
2019 ["enter"],
2020 error_ok=True))
a4b75251 2021 returned_msg = _err[0].split('\n')[-1]
1e59de90 2022 if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it:
f67539c2
TL
2023 raise OrchestratorError(
2024 f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
2025
2026 if "osd" in host_daemons:
2027 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
2028 rc, out, err = self.mon_command({
2029 'prefix': 'osd set-group',
2030 'flags': 'noout',
2031 'who': [crush_node],
2032 'format': 'json'
2033 })
1e59de90 2034 if rc and not yes_i_really_mean_it:
f67539c2
TL
2035 self.log.warning(
2036 f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
2037 raise OrchestratorError(
2038 f"Unable to set the osds on {hostname} to noout (rc={rc})")
1e59de90 2039 elif not rc:
f67539c2
TL
2040 self.log.info(
2041 f"maintenance mode request for {hostname} has SET the noout group")
2042
2043 # update the host status in the inventory
2044 tgt_host["status"] = "maintenance"
2045 self.inventory._inventory[hostname] = tgt_host
2046 self.inventory.save()
2047
2048 self._set_maintenance_healthcheck()
522d829b 2049 return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
f67539c2
TL
2050
2051 @handle_orch_error
2052 @host_exists()
2053 def exit_host_maintenance(self, hostname: str) -> str:
2054 """Exit maintenance mode and return a host to an operational state
2055
1e59de90 2056 Returning from maintenance will enable the clusters systemd target and
f67539c2
TL
2057 start it, and remove any noout that has been added for the host if the
2058 host has osd daemons
2059
2060 :param hostname: (str) host name
2061
2062 :raises OrchestratorError: Unable to return from maintenance, or unset the
2063 noout flag
2064 """
2065 tgt_host = self.inventory._inventory[hostname]
2066 if tgt_host['status'] != "maintenance":
2067 raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
2068
1e59de90
TL
2069 with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
2070 outs, errs, _code = self.wait_async(
2071 CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
2072 'host-maintenance', ['exit'], error_ok=True))
a4b75251
TL
2073 returned_msg = errs[0].split('\n')[-1]
2074 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
f67539c2
TL
2075 raise OrchestratorError(
2076 f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
2077
2078 if "osd" in self.cache.get_daemon_types(hostname):
2079 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
2080 rc, _out, _err = self.mon_command({
2081 'prefix': 'osd unset-group',
2082 'flags': 'noout',
2083 'who': [crush_node],
2084 'format': 'json'
2085 })
2086 if rc:
2087 self.log.warning(
2088 f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
2089 raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
2090 else:
2091 self.log.info(
2092 f"exit maintenance request has UNSET for the noout group on host {hostname}")
2093
2094 # update the host record status
2095 tgt_host['status'] = ""
2096 self.inventory._inventory[hostname] = tgt_host
2097 self.inventory.save()
2098
2099 self._set_maintenance_healthcheck()
2100
2101 return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
2102
2a845540
TL
2103 @handle_orch_error
2104 @host_exists()
2105 def rescan_host(self, hostname: str) -> str:
2106 """Use cephadm to issue a disk rescan on each HBA
2107
2108 Some HBAs and external enclosures don't automatically register
2109 device insertion with the kernel, so for these scenarios we need
2110 to manually rescan
2111
2112 :param hostname: (str) host name
2113 """
2114 self.log.info(f'disk rescan request sent to host "{hostname}"')
1e59de90
TL
2115 with self.async_timeout_handler(hostname, 'cephadm disk-rescan'):
2116 _out, _err, _code = self.wait_async(
2117 CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
2118 [], no_fsid=True, error_ok=True))
2a845540
TL
2119 if not _err:
2120 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
2121
2122 msg = _err[0].split('\n')[-1]
2123 log_msg = f'disk rescan: {msg}'
2124 if msg.upper().startswith('OK'):
2125 self.log.info(log_msg)
2126 else:
2127 self.log.warning(log_msg)
2128
2129 return f'{msg}'
2130
f91f0fd5
TL
2131 def get_minimal_ceph_conf(self) -> str:
2132 _, config, _ = self.check_mon_command({
f6b5b4d7
TL
2133 "prefix": "config generate-minimal-conf",
2134 })
f91f0fd5
TL
2135 extra = self.extra_ceph_conf().conf
2136 if extra:
1e59de90
TL
2137 try:
2138 config = self._combine_confs(config, extra)
2139 except Exception as e:
2140 self.log.error(f'Failed to add extra ceph conf settings to minimal ceph conf: {e}')
f91f0fd5 2141 return config
f6b5b4d7 2142
1e59de90
TL
2143 def _combine_confs(self, conf1: str, conf2: str) -> str:
2144 section_to_option: Dict[str, List[str]] = {}
2145 final_conf: str = ''
2146 for conf in [conf1, conf2]:
2147 if not conf:
2148 continue
2149 section = ''
2150 for line in conf.split('\n'):
2151 if line.strip().startswith('#') or not line.strip():
2152 continue
2153 if line.strip().startswith('[') and line.strip().endswith(']'):
2154 section = line.strip().replace('[', '').replace(']', '')
2155 if section not in section_to_option:
2156 section_to_option[section] = []
2157 else:
2158 section_to_option[section].append(line.strip())
2159
2160 first_section = True
2161 for section, options in section_to_option.items():
2162 if not first_section:
2163 final_conf += '\n'
2164 final_conf += f'[{section}]\n'
2165 for option in options:
2166 final_conf += f'{option}\n'
2167 first_section = False
2168
2169 return final_conf
2170
adb31ebb 2171 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
f6b5b4d7
TL
2172 if filter_host:
2173 self.cache.invalidate_host_daemons(filter_host)
2174 else:
2175 for h in self.cache.get_hosts():
2176 # Also discover daemons deployed manually
2177 self.cache.invalidate_host_daemons(h)
2178
2179 self._kick_serve_loop()
2180
f67539c2 2181 @handle_orch_error
f6b5b4d7
TL
2182 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
2183 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
9f95a23c 2184 if refresh:
f6b5b4d7 2185 self._invalidate_daemons_and_kick_serve()
b3b6e05e 2186 self.log.debug('Kicked serve() loop to refresh all services')
f6b5b4d7 2187
f6b5b4d7 2188 sm: Dict[str, orchestrator.ServiceDescription] = {}
f67539c2
TL
2189
2190 # known services
2191 for nm, spec in self.spec_store.all_specs.items():
2192 if service_type is not None and service_type != spec.service_type:
2193 continue
2194 if service_name is not None and service_name != nm:
2195 continue
20effc67
TL
2196
2197 if spec.service_type != 'osd':
2198 size = spec.placement.get_target_count(self.cache.get_schedulable_hosts())
2199 else:
2200 # osd counting is special
2201 size = 0
2202
f67539c2
TL
2203 sm[nm] = orchestrator.ServiceDescription(
2204 spec=spec,
20effc67 2205 size=size,
f67539c2
TL
2206 running=0,
2207 events=self.events.get_for_service(spec.service_name()),
2208 created=self.spec_store.spec_created[nm],
2209 deleted=self.spec_store.spec_deleted.get(nm, None),
2210 virtual_ip=spec.get_virtual_ip(),
2211 ports=spec.get_port_start(),
2212 )
f67539c2
TL
2213 if spec.service_type == 'ingress':
2214 # ingress has 2 daemons running per host
1e59de90
TL
2215 # but only if it's the full ingress service, not for keepalive-only
2216 if not cast(IngressSpec, spec).keepalive_only:
2217 sm[nm].size *= 2
f67539c2
TL
2218
2219 # factor daemons into status
1911f103 2220 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c 2221 for name, dd in dm.items():
f67539c2
TL
2222 assert dd.hostname is not None, f'no hostname for {dd!r}'
2223 assert dd.daemon_type is not None, f'no daemon_type for {dd!r}'
2224
9f95a23c 2225 n: str = dd.service_name()
f67539c2
TL
2226
2227 if (
2228 service_type
b3b6e05e 2229 and service_type != daemon_type_to_service(dd.daemon_type)
f67539c2
TL
2230 ):
2231 continue
9f95a23c
TL
2232 if service_name and service_name != n:
2233 continue
f67539c2
TL
2234
2235 if n not in sm:
2236 # new unmanaged service
1911f103
TL
2237 spec = ServiceSpec(
2238 unmanaged=True,
f67539c2 2239 service_type=daemon_type_to_service(dd.daemon_type),
1911f103 2240 service_id=dd.service_id(),
1911f103 2241 )
9f95a23c 2242 sm[n] = orchestrator.ServiceDescription(
9f95a23c
TL
2243 last_refresh=dd.last_refresh,
2244 container_image_id=dd.container_image_id,
2245 container_image_name=dd.container_image_name,
2246 spec=spec,
f67539c2 2247 size=0,
9f95a23c 2248 )
f67539c2
TL
2249
2250 if dd.status == DaemonDescriptionStatus.running:
9f95a23c 2251 sm[n].running += 1
f67539c2
TL
2252 if dd.daemon_type == 'osd':
2253 # The osd count can't be determined by the Placement spec.
2254 # Showing an actual/expected representation cannot be determined
2255 # here. So we're setting running = size for now.
2256 sm[n].size += 1
2257 if (
2258 not sm[n].last_refresh
2259 or not dd.last_refresh
2260 or dd.last_refresh < sm[n].last_refresh # type: ignore
2261 ):
9f95a23c 2262 sm[n].last_refresh = dd.last_refresh
f67539c2 2263
1911f103 2264 return list(sm.values())
9f95a23c 2265
f67539c2 2266 @handle_orch_error
f6b5b4d7
TL
2267 def list_daemons(self,
2268 service_name: Optional[str] = None,
2269 daemon_type: Optional[str] = None,
2270 daemon_id: Optional[str] = None,
2271 host: Optional[str] = None,
2272 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
9f95a23c 2273 if refresh:
f6b5b4d7 2274 self._invalidate_daemons_and_kick_serve(host)
b3b6e05e 2275 self.log.debug('Kicked serve() loop to refresh all daemons')
f6b5b4d7 2276
9f95a23c 2277 result = []
1911f103 2278 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c
TL
2279 if host and h != host:
2280 continue
2281 for name, dd in dm.items():
801d1391
TL
2282 if daemon_type is not None and daemon_type != dd.daemon_type:
2283 continue
2284 if daemon_id is not None and daemon_id != dd.daemon_id:
9f95a23c 2285 continue
801d1391 2286 if service_name is not None and service_name != dd.service_name():
9f95a23c 2287 continue
b3b6e05e
TL
2288 if not dd.memory_request and dd.daemon_type in ['osd', 'mon']:
2289 dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option(
2290 dd.name(),
2291 f"{dd.daemon_type}_memory_target"
2292 ))
9f95a23c
TL
2293 result.append(dd)
2294 return result
2295
f67539c2 2296 @handle_orch_error
adb31ebb 2297 def service_action(self, action: str, service_name: str) -> List[str]:
2a845540
TL
2298 if service_name not in self.spec_store.all_specs.keys():
2299 raise OrchestratorError(f'Invalid service name "{service_name}".'
2300 + ' View currently running services using "ceph orch ls"')
adb31ebb 2301 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
f67539c2
TL
2302 if not dds:
2303 raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
2304 + ' View currently running services using "ceph orch ls"')
522d829b
TL
2305 if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
2306 return [f'Stopping entire {service_name} service is prohibited.']
9f95a23c 2307 self.log.info('%s service %s' % (action.capitalize(), service_name))
adb31ebb
TL
2308 return [
2309 self._schedule_daemon_action(dd.name(), action)
2310 for dd in dds
2311 ]
f6b5b4d7 2312
39ae355f
TL
2313 def _rotate_daemon_key(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
2314 self.log.info(f'Rotating authentication key for {daemon_spec.name()}')
2315 rc, out, err = self.mon_command({
2316 'prefix': 'auth get-or-create-pending',
2317 'entity': daemon_spec.entity_name(),
2318 'format': 'json',
2319 })
2320 j = json.loads(out)
2321 pending_key = j[0]['pending_key']
2322
2323 # deploy a new keyring file
2324 if daemon_spec.daemon_type != 'osd':
2325 daemon_spec = self.cephadm_services[daemon_type_to_service(
2326 daemon_spec.daemon_type)].prepare_create(daemon_spec)
1e59de90
TL
2327 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2328 self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
39ae355f
TL
2329
2330 # try to be clever, or fall back to restarting the daemon
2331 rc = -1
2332 if daemon_spec.daemon_type == 'osd':
2333 rc, out, err = self.tool_exec(
2334 args=['ceph', 'tell', daemon_spec.name(), 'rotate-stored-key', '-i', '-'],
2335 stdin=pending_key.encode()
2336 )
2337 if not rc:
2338 rc, out, err = self.tool_exec(
2339 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2340 stdin=pending_key.encode()
2341 )
2342 elif daemon_spec.daemon_type == 'mds':
2343 rc, out, err = self.tool_exec(
2344 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2345 stdin=pending_key.encode()
2346 )
2347 elif (
2348 daemon_spec.daemon_type == 'mgr'
2349 and daemon_spec.daemon_id == self.get_mgr_id()
2350 ):
2351 rc, out, err = self.tool_exec(
2352 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2353 stdin=pending_key.encode()
2354 )
2355 if rc:
2356 self._daemon_action(daemon_spec, 'restart')
2357
2358 return f'Rotated key for {daemon_spec.name()}'
2359
f67539c2
TL
2360 def _daemon_action(self,
2361 daemon_spec: CephadmDaemonDeploySpec,
2362 action: str,
2363 image: Optional[str] = None) -> str:
2364 self._daemon_action_set_image(action, image, daemon_spec.daemon_type,
2365 daemon_spec.daemon_id)
f6b5b4d7 2366
b3b6e05e
TL
2367 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type,
2368 daemon_spec.daemon_id):
f67539c2
TL
2369 self.mgr_service.fail_over()
2370 return '' # unreachable
9f95a23c 2371
39ae355f
TL
2372 if action == 'rotate-key':
2373 return self._rotate_daemon_key(daemon_spec)
2374
f67539c2
TL
2375 if action == 'redeploy' or action == 'reconfig':
2376 if daemon_spec.daemon_type != 'osd':
2377 daemon_spec = self.cephadm_services[daemon_type_to_service(
2378 daemon_spec.daemon_type)].prepare_create(daemon_spec)
2a845540
TL
2379 else:
2380 # for OSDs, we still need to update config, just not carry out the full
2381 # prepare_create function
39ae355f
TL
2382 daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(
2383 daemon_spec)
1e59de90
TL
2384 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2385 return self.wait_async(
2386 CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
9f95a23c
TL
2387
2388 actions = {
2389 'start': ['reset-failed', 'start'],
2390 'stop': ['stop'],
2391 'restart': ['reset-failed', 'restart'],
2392 }
f6b5b4d7 2393 name = daemon_spec.name()
9f95a23c 2394 for a in actions[action]:
f6b5b4d7 2395 try:
1e59de90
TL
2396 with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'):
2397 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2398 daemon_spec.host, name, 'unit',
2399 ['--name', name, a]))
f6b5b4d7 2400 except Exception:
f67539c2 2401 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
f6b5b4d7
TL
2402 self.cache.invalidate_host_daemons(daemon_spec.host)
2403 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
2404 self.events.for_daemon(name, 'INFO', msg)
2405 return msg
9f95a23c 2406
adb31ebb 2407 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
f91f0fd5
TL
2408 if image is not None:
2409 if action != 'redeploy':
2410 raise OrchestratorError(
2411 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
a4b75251 2412 if daemon_type not in CEPH_IMAGE_TYPES:
f91f0fd5
TL
2413 raise OrchestratorError(
2414 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
a4b75251 2415 f'types are: {", ".join(CEPH_IMAGE_TYPES)}')
f91f0fd5
TL
2416
2417 self.check_mon_command({
2418 'prefix': 'config set',
2419 'name': 'container_image',
2420 'value': image,
2421 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
2422 })
2423
f67539c2 2424 @handle_orch_error
f91f0fd5 2425 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
f6b5b4d7 2426 d = self.cache.get_daemon(daemon_name)
f67539c2
TL
2427 assert d.daemon_type is not None
2428 assert d.daemon_id is not None
f6b5b4d7 2429
b3b6e05e 2430 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \
f91f0fd5
TL
2431 and not self.mgr_service.mgr_map_has_standby():
2432 raise OrchestratorError(
2433 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2434
39ae355f
TL
2435 if action == 'rotate-key':
2436 if d.daemon_type not in ['mgr', 'osd', 'mds',
2437 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
2438 raise OrchestratorError(
2439 f'key rotation not supported for {d.daemon_type}'
2440 )
2441
f91f0fd5
TL
2442 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
2443
2444 self.log.info(f'Schedule {action} daemon {daemon_name}')
2445 return self._schedule_daemon_action(daemon_name, action)
2446
2447 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
2448 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
2449
1e59de90
TL
2450 def get_active_mgr(self) -> DaemonDescription:
2451 return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr'))
2452
f67539c2
TL
2453 def get_active_mgr_digests(self) -> List[str]:
2454 digests = self.mgr_service.get_active_daemon(
2455 self.cache.get_daemons_by_type('mgr')).container_image_digests
2456 return digests if digests else []
2457
adb31ebb 2458 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
f91f0fd5 2459 dd = self.cache.get_daemon(daemon_name)
f67539c2
TL
2460 assert dd.daemon_type is not None
2461 assert dd.daemon_id is not None
2462 assert dd.hostname is not None
b3b6e05e 2463 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
f91f0fd5
TL
2464 and not self.mgr_service.mgr_map_has_standby():
2465 raise OrchestratorError(
2466 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2467 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
39ae355f 2468 self.cache.save_host(dd.hostname)
f91f0fd5
TL
2469 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
2470 self._kick_serve_loop()
2471 return msg
9f95a23c 2472
f67539c2 2473 @handle_orch_error
9f95a23c 2474 def remove_daemons(self, names):
e306af50 2475 # type: (List[str]) -> List[str]
9f95a23c
TL
2476 args = []
2477 for host, dm in self.cache.daemons.items():
2478 for name in names:
2479 if name in dm:
2480 args.append((name, host))
2481 if not args:
2482 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
f67539c2 2483 self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
9f95a23c
TL
2484 return self._remove_daemons(args)
2485
f67539c2 2486 @handle_orch_error
a4b75251 2487 def remove_service(self, service_name: str, force: bool = False) -> str:
9f95a23c 2488 self.log.info('Remove service %s' % service_name)
e306af50 2489 self._trigger_preview_refresh(service_name=service_name)
f67539c2
TL
2490 if service_name in self.spec_store:
2491 if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
2492 return f'Unable to remove {service_name} service.\n' \
2493 f'Note, you might want to mark the {service_name} service as "unmanaged"'
33c7a0ef
TL
2494 else:
2495 return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
b3b6e05e 2496
a4b75251
TL
2497 # Report list of affected OSDs?
2498 if not force and service_name.startswith('osd.'):
2499 osds_msg = {}
b3b6e05e
TL
2500 for h, dm in self.cache.get_daemons_with_volatile_status():
2501 osds_to_remove = []
2502 for name, dd in dm.items():
20effc67 2503 if dd.daemon_type == 'osd' and dd.service_name() == service_name:
b3b6e05e
TL
2504 osds_to_remove.append(str(dd.daemon_id))
2505 if osds_to_remove:
2506 osds_msg[h] = osds_to_remove
b3b6e05e 2507 if osds_msg:
a4b75251
TL
2508 msg = ''
2509 for h, ls in osds_msg.items():
2510 msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
20effc67
TL
2511 raise OrchestratorError(
2512 f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
a4b75251
TL
2513
2514 found = self.spec_store.rm(service_name)
2515 if found and service_name.startswith('osd.'):
2516 self.spec_store.finally_rm(service_name)
2517 self._kick_serve_loop()
2518 return f'Removed service {service_name}'
9f95a23c 2519
f67539c2 2520 @handle_orch_error
adb31ebb 2521 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
9f95a23c
TL
2522 """
2523 Return the storage inventory of hosts matching the given filter.
2524
2525 :param host_filter: host filter
2526
2527 TODO:
2528 - add filtering by label
2529 """
2530 if refresh:
f6b5b4d7
TL
2531 if host_filter and host_filter.hosts:
2532 for h in host_filter.hosts:
b3b6e05e 2533 self.log.debug(f'will refresh {h} devs')
f6b5b4d7 2534 self.cache.invalidate_host_devices(h)
20effc67 2535 self.cache.invalidate_host_networks(h)
9f95a23c 2536 else:
f6b5b4d7 2537 for h in self.cache.get_hosts():
b3b6e05e 2538 self.log.debug(f'will refresh {h} devs')
f6b5b4d7 2539 self.cache.invalidate_host_devices(h)
20effc67 2540 self.cache.invalidate_host_networks(h)
f6b5b4d7
TL
2541
2542 self.event.set()
b3b6e05e 2543 self.log.debug('Kicked serve() loop to refresh devices')
9f95a23c
TL
2544
2545 result = []
2546 for host, dls in self.cache.devices.items():
f6b5b4d7 2547 if host_filter and host_filter.hosts and host not in host_filter.hosts:
9f95a23c
TL
2548 continue
2549 result.append(orchestrator.InventoryHost(host,
2550 inventory.Devices(dls)))
2551 return result
2552
f67539c2 2553 @handle_orch_error
adb31ebb 2554 def zap_device(self, host: str, path: str) -> str:
a4b75251
TL
2555 """Zap a device on a managed host.
2556
2557 Use ceph-volume zap to return a device to an unused/free state
2558
2559 Args:
2560 host (str): hostname of the cluster host
2561 path (str): device path
2562
2563 Raises:
2564 OrchestratorError: host is not a cluster host
2565 OrchestratorError: host is in maintenance and therefore unavailable
2566 OrchestratorError: device path not found on the host
2567 OrchestratorError: device is known to a different ceph cluster
2568 OrchestratorError: device holds active osd
2569 OrchestratorError: device cache hasn't been populated yet..
2570
2571 Returns:
2572 str: output from the zap command
2573 """
2574
9f95a23c 2575 self.log.info('Zap device %s:%s' % (host, path))
a4b75251
TL
2576
2577 if host not in self.inventory.keys():
2578 raise OrchestratorError(
2579 f"Host '{host}' is not a member of the cluster")
2580
2581 host_info = self.inventory._inventory.get(host, {})
2582 if host_info.get('status', '').lower() == 'maintenance':
2583 raise OrchestratorError(
2584 f"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2585
2586 if host not in self.cache.devices:
2587 raise OrchestratorError(
2588 f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2589
2590 host_devices = self.cache.devices[host]
2591 path_found = False
2592 osd_id_list: List[str] = []
2593
2594 for dev in host_devices:
2595 if dev.path == path:
a4b75251
TL
2596 path_found = True
2597 break
2598 if not path_found:
2599 raise OrchestratorError(
2600 f"Device path '{path}' not found on host '{host}'")
2601
2602 if osd_id_list:
2603 dev_name = os.path.basename(path)
2604 active_osds: List[str] = []
2605 for osd_id in osd_id_list:
2606 metadata = self.get_metadata('osd', str(osd_id))
2607 if metadata:
2608 if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','):
2609 active_osds.append("osd." + osd_id)
2610 if active_osds:
2611 raise OrchestratorError(
2612 f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2613 f"OSD{'s' if len(active_osds) > 1 else ''}"
2614 f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2615
1e59de90
TL
2616 cv_args = ['--', 'lvm', 'zap', '--destroy', path]
2617 with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'):
2618 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2619 host, 'osd', 'ceph-volume', cv_args, error_ok=True))
a4b75251 2620
9f95a23c 2621 self.cache.invalidate_host_devices(host)
20effc67 2622 self.cache.invalidate_host_networks(host)
9f95a23c
TL
2623 if code:
2624 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
a4b75251
TL
2625 msg = f'zap successful for {path} on {host}'
2626 self.log.info(msg)
2627
2628 return msg + '\n'
9f95a23c 2629
f67539c2 2630 @handle_orch_error
f91f0fd5
TL
2631 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
2632 """
2633 Blink a device light. Calling something like::
2634
2635 lsmcli local-disk-ident-led-on --path $path
2636
2637 If you must, you can customize this via::
2638
2639 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2640 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2641
2642 See templates/blink_device_light_cmd.j2
2643 """
e306af50 2644 @forall_hosts
adb31ebb 2645 def blink(host: str, dev: str, path: str) -> str:
f91f0fd5
TL
2646 cmd_line = self.template.render('blink_device_light_cmd.j2',
2647 {
2648 'on': on,
2649 'ident_fault': ident_fault,
2650 'dev': dev,
2651 'path': path
2652 },
2653 host=host)
2654 cmd_args = shlex.split(cmd_line)
2655
1e59de90
TL
2656 with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'):
2657 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2658 host, 'osd', 'shell', ['--'] + cmd_args,
2659 error_ok=True))
9f95a23c 2660 if code:
f6b5b4d7 2661 raise OrchestratorError(
9f95a23c 2662 'Unable to affect %s light for %s:%s. Command: %s' % (
f91f0fd5 2663 ident_fault, host, dev, ' '.join(cmd_args)))
9f95a23c
TL
2664 self.log.info('Set %s light for %s:%s %s' % (
2665 ident_fault, host, dev, 'on' if on else 'off'))
2666 return "Set %s light for %s:%s %s" % (
2667 ident_fault, host, dev, 'on' if on else 'off')
2668
2669 return blink(locs)
2670
2671 def get_osd_uuid_map(self, only_up=False):
1911f103 2672 # type: (bool) -> Dict[str, str]
9f95a23c
TL
2673 osd_map = self.get('osd_map')
2674 r = {}
2675 for o in osd_map['osds']:
2676 # only include OSDs that have ever started in this map. this way
2677 # an interrupted osd create can be repeated and succeed the second
2678 # time around.
1911f103
TL
2679 osd_id = o.get('osd')
2680 if osd_id is None:
2681 raise OrchestratorError("Could not retrieve osd_id from osd_map")
20effc67 2682 if not only_up:
1911f103 2683 r[str(osd_id)] = o.get('uuid', '')
9f95a23c
TL
2684 return r
2685
20effc67
TL
2686 def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]:
2687 osd = [x for x in self.get('osd_map')['osds']
2688 if x['osd'] == osd_id]
2689
2690 if len(osd) != 1:
2691 return None
2692
2693 return osd[0]
2694
e306af50
TL
2695 def _trigger_preview_refresh(self,
2696 specs: Optional[List[DriveGroupSpec]] = None,
f6b5b4d7
TL
2697 service_name: Optional[str] = None,
2698 ) -> None:
2699 # Only trigger a refresh when a spec has changed
2700 trigger_specs = []
2701 if specs:
2702 for spec in specs:
2703 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
2704 # the to-be-preview spec != the actual spec, this means we need to
2705 # trigger a refresh, if the spec has been removed (==None) we need to
2706 # refresh as well.
2707 if not preview_spec or spec != preview_spec:
2708 trigger_specs.append(spec)
2709 if service_name:
2710 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
2711 if not any(trigger_specs):
2712 return None
2713
2714 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
e306af50
TL
2715 for host in refresh_hosts:
2716 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
2717 self.cache.osdspec_previews_refresh_queue.append(host)
2718
f67539c2 2719 @handle_orch_error
f6b5b4d7
TL
2720 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
2721 """
2722 Deprecated. Please use `apply()` instead.
2723
1e59de90 2724 Keeping this around to be compatible to mgr/dashboard
f6b5b4d7 2725 """
9f95a23c
TL
2726 return [self._apply(spec) for spec in specs]
2727
f67539c2 2728 @handle_orch_error
f6b5b4d7 2729 def create_osds(self, drive_group: DriveGroupSpec) -> str:
33c7a0ef
TL
2730 hosts: List[HostSpec] = self.inventory.all_specs()
2731 filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
2732 if not filtered_hosts:
2733 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
f6b5b4d7 2734 return self.osd_service.create_from_spec(drive_group)
9f95a23c 2735
f6b5b4d7
TL
2736 def _preview_osdspecs(self,
2737 osdspecs: Optional[List[DriveGroupSpec]] = None
adb31ebb 2738 ) -> dict:
f6b5b4d7
TL
2739 if not osdspecs:
2740 return {'n/a': [{'error': True,
2741 'message': 'No OSDSpec or matching hosts found.'}]}
2742 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
e306af50
TL
2743 if not matching_hosts:
2744 return {'n/a': [{'error': True,
2745 'message': 'No OSDSpec or matching hosts found.'}]}
adb31ebb 2746 # Is any host still loading previews or still in the queue to be previewed
e306af50 2747 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
adb31ebb 2748 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
e306af50
TL
2749 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2750 return {'n/a': [{'error': True,
2751 'message': 'Preview data is being generated.. '
f6b5b4d7
TL
2752 'Please re-run this command in a bit.'}]}
2753 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2754 previews_for_specs = {}
2755 for host, raw_reports in self.cache.osdspec_previews.items():
2756 if host not in matching_hosts:
2757 continue
2758 osd_reports = []
2759 for osd_report in raw_reports:
2760 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
2761 osd_reports.append(osd_report)
2762 previews_for_specs.update({host: osd_reports})
2763 return previews_for_specs
9f95a23c 2764
f67539c2
TL
2765 def _calc_daemon_deps(self,
2766 spec: Optional[ServiceSpec],
2767 daemon_type: str,
2768 daemon_id: str) -> List[str]:
1e59de90
TL
2769
2770 def get_daemon_names(daemons: List[str]) -> List[str]:
2771 daemon_names = []
2772 for daemon_type in daemons:
2773 for dd in self.cache.get_daemons_by_type(daemon_type):
2774 daemon_names.append(dd.name())
2775 return daemon_names
2776
aee94f69
TL
2777 alertmanager_user, alertmanager_password = self._get_alertmanager_credentials()
2778 prometheus_user, prometheus_password = self._get_prometheus_credentials()
2779
9f95a23c 2780 deps = []
f67539c2
TL
2781 if daemon_type == 'haproxy':
2782 # because cephadm creates new daemon instances whenever
2783 # port or ip changes, identifying daemons by name is
2784 # sufficient to detect changes.
2785 if not spec:
2786 return []
2787 ingress_spec = cast(IngressSpec, spec)
2788 assert ingress_spec.backend_service
2789 daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service)
2790 deps = [d.name() for d in daemons]
2791 elif daemon_type == 'keepalived':
2792 # because cephadm creates new daemon instances whenever
2793 # port or ip changes, identifying daemons by name is
2794 # sufficient to detect changes.
2795 if not spec:
2796 return []
2797 daemons = self.cache.get_daemons_by_service(spec.service_name())
2798 deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
20effc67
TL
2799 elif daemon_type == 'agent':
2800 root_cert = ''
2801 server_port = ''
2802 try:
1e59de90
TL
2803 server_port = str(self.http_server.agent.server_port)
2804 root_cert = self.http_server.agent.ssl_certs.get_root_cert()
20effc67
TL
2805 except Exception:
2806 pass
2807 deps = sorted([self.get_mgr_ip(), server_port, root_cert,
2808 str(self.device_enhanced_scan)])
f38dd50b
TL
2809 elif daemon_type == 'node-proxy':
2810 root_cert = ''
2811 server_port = ''
2812 try:
2813 server_port = str(self.http_server.agent.server_port)
2814 root_cert = self.http_server.agent.ssl_certs.get_root_cert()
2815 except Exception:
2816 pass
2817 deps = sorted([self.get_mgr_ip(), server_port, root_cert])
a4b75251 2818 elif daemon_type == 'iscsi':
39ae355f
TL
2819 if spec:
2820 iscsi_spec = cast(IscsiServiceSpec, spec)
2821 deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)]
2822 else:
2823 deps = [self.get_mgr_ip()]
1e59de90
TL
2824 elif daemon_type == 'prometheus':
2825 # for prometheus we add the active mgr as an explicit dependency,
2826 # this way we force a redeploy after a mgr failover
2827 deps.append(self.get_active_mgr().name())
2828 deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
2829 deps.append(str(self.service_discovery_port))
2830 # prometheus yaml configuration file (generated by prometheus.yml.j2) contains
2831 # a scrape_configs section for each service type. This should be included only
2832 # when at least one daemon of the corresponding service is running. Therefore,
2833 # an explicit dependency is added for each service-type to force a reconfig
2834 # whenever the number of daemons for those service-type changes from 0 to greater
2835 # than zero and vice versa.
aee94f69
TL
2836 deps += [s for s in ['node-exporter', 'alertmanager']
2837 if self.cache.get_daemons_by_service(s)]
1e59de90
TL
2838 if len(self.cache.get_daemons_by_type('ingress')) > 0:
2839 deps.append('ingress')
2840 # add dependency on ceph-exporter daemons
2841 deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')]
2842 if self.secure_monitoring_stack:
aee94f69
TL
2843 if prometheus_user and prometheus_password:
2844 deps.append(f'{hash(prometheus_user + prometheus_password)}')
2845 if alertmanager_user and alertmanager_password:
2846 deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
1e59de90
TL
2847 elif daemon_type == 'grafana':
2848 deps += get_daemon_names(['prometheus', 'loki'])
aee94f69
TL
2849 if self.secure_monitoring_stack and prometheus_user and prometheus_password:
2850 deps.append(f'{hash(prometheus_user + prometheus_password)}')
1e59de90
TL
2851 elif daemon_type == 'alertmanager':
2852 deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
aee94f69
TL
2853 if self.secure_monitoring_stack and alertmanager_user and alertmanager_password:
2854 deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
1e59de90
TL
2855 elif daemon_type == 'promtail':
2856 deps += get_daemon_names(['loki'])
f38dd50b
TL
2857 elif daemon_type == JaegerAgentService.TYPE:
2858 for dd in self.cache.get_daemons_by_type(JaegerCollectorService.TYPE):
2859 assert dd.hostname is not None
2860 port = dd.ports[0] if dd.ports else JaegerCollectorService.DEFAULT_SERVICE_PORT
2861 deps.append(build_url(host=dd.hostname, port=port).lstrip('/'))
2862 deps = sorted(deps)
f67539c2 2863 else:
1e59de90
TL
2864 # TODO(redo): some error message!
2865 pass
2866
2867 if daemon_type in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']:
2868 deps.append(f'secure_monitoring_stack:{self.secure_monitoring_stack}')
2869
9f95a23c
TL
2870 return sorted(deps)
2871
e306af50 2872 @forall_hosts
adb31ebb 2873 def _remove_daemons(self, name: str, host: str) -> str:
f67539c2 2874 return CephadmServe(self)._remove_daemon(name, host)
9f95a23c 2875
adb31ebb 2876 def _check_pool_exists(self, pool: str, service_name: str) -> None:
e306af50
TL
2877 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
2878 if not self.rados.pool_exists(pool):
2879 raise OrchestratorError(f'Cannot find pool "{pool}" for '
2880 f'service {service_name}')
2881
adb31ebb
TL
2882 def _add_daemon(self,
2883 daemon_type: str,
f67539c2 2884 spec: ServiceSpec) -> List[str]:
9f95a23c
TL
2885 """
2886 Add (and place) a daemon. Require explicit host placement. Do not
2887 schedule, and do not apply the related scheduling limitations.
2888 """
f67539c2
TL
2889 if spec.service_name() not in self.spec_store:
2890 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2891 'Please use `ceph orch apply ...` to create a Service.\n'
2892 'Note, you might want to create the service with "unmanaged=true"')
2893
9f95a23c
TL
2894 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2895 if not spec.placement.hosts:
2896 raise OrchestratorError('must specify host(s) to deploy on')
2897 count = spec.placement.count or len(spec.placement.hosts)
2898 daemons = self.cache.get_daemons_by_service(spec.service_name())
2899 return self._create_daemons(daemon_type, spec, daemons,
f67539c2 2900 spec.placement.hosts, count)
9f95a23c 2901
adb31ebb
TL
2902 def _create_daemons(self,
2903 daemon_type: str,
2904 spec: ServiceSpec,
2905 daemons: List[DaemonDescription],
2906 hosts: List[HostPlacementSpec],
f67539c2 2907 count: int) -> List[str]:
9f95a23c
TL
2908 if count > len(hosts):
2909 raise OrchestratorError('too few hosts: want %d, have %s' % (
2910 count, hosts))
2911
f6b5b4d7 2912 did_config = False
f67539c2 2913 service_type = daemon_type_to_service(daemon_type)
9f95a23c 2914
f67539c2 2915 args = [] # type: List[CephadmDaemonDeploySpec]
9f95a23c
TL
2916 for host, network, name in hosts:
2917 daemon_id = self.get_unique_name(daemon_type, host, daemons,
e306af50
TL
2918 prefix=spec.service_id,
2919 forcename=name)
f6b5b4d7 2920
f67539c2 2921 if not did_config:
522d829b 2922 self.cephadm_services[service_type].config(spec)
f6b5b4d7
TL
2923 did_config = True
2924
f67539c2
TL
2925 daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
2926 host, daemon_id, network, spec,
2927 # NOTE: this does not consider port conflicts!
2928 ports=spec.get_port_start())
9f95a23c
TL
2929 self.log.debug('Placing %s.%s on host %s' % (
2930 daemon_type, daemon_id, host))
f6b5b4d7 2931 args.append(daemon_spec)
9f95a23c
TL
2932
2933 # add to daemon list so next name(s) will also be unique
2934 sd = orchestrator.DaemonDescription(
2935 hostname=host,
2936 daemon_type=daemon_type,
2937 daemon_id=daemon_id,
2938 )
2939 daemons.append(sd)
2940
f67539c2 2941 @ forall_hosts
adb31ebb 2942 def create_func_map(*args: Any) -> str:
f67539c2 2943 daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
1e59de90
TL
2944 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2945 return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
9f95a23c
TL
2946
2947 return create_func_map(args)
2948
f67539c2
TL
2949 @handle_orch_error
2950 def add_daemon(self, spec: ServiceSpec) -> List[str]:
2951 ret: List[str] = []
20effc67
TL
2952 try:
2953 with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True):
2954 for d_type in service_to_daemon_types(spec.service_type):
2955 ret.extend(self._add_daemon(d_type, spec))
2956 return ret
2957 except OrchestratorError as e:
2958 self.events.from_orch_error(e)
2959 raise
f67539c2 2960
aee94f69
TL
2961 def _get_alertmanager_credentials(self) -> Tuple[str, str]:
2962 user = self.get_store(AlertmanagerService.USER_CFG_KEY)
2963 password = self.get_store(AlertmanagerService.PASS_CFG_KEY)
2964 if user is None or password is None:
2965 user = 'admin'
2966 password = 'admin'
2967 self.set_store(AlertmanagerService.USER_CFG_KEY, user)
2968 self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
2969 return (user, password)
2970
2971 def _get_prometheus_credentials(self) -> Tuple[str, str]:
2972 user = self.get_store(PrometheusService.USER_CFG_KEY)
2973 password = self.get_store(PrometheusService.PASS_CFG_KEY)
2974 if user is None or password is None:
2975 user = 'admin'
2976 password = 'admin'
2977 self.set_store(PrometheusService.USER_CFG_KEY, user)
2978 self.set_store(PrometheusService.PASS_CFG_KEY, password)
2979 return (user, password)
2980
2981 @handle_orch_error
2982 def set_prometheus_access_info(self, user: str, password: str) -> str:
2983 self.set_store(PrometheusService.USER_CFG_KEY, user)
2984 self.set_store(PrometheusService.PASS_CFG_KEY, password)
2985 return 'prometheus credentials updated correctly'
2986
2987 @handle_orch_error
2988 def set_alertmanager_access_info(self, user: str, password: str) -> str:
2989 self.set_store(AlertmanagerService.USER_CFG_KEY, user)
2990 self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
2991 return 'alertmanager credentials updated correctly'
2992
1e59de90
TL
2993 @handle_orch_error
2994 def get_prometheus_access_info(self) -> Dict[str, str]:
aee94f69
TL
2995 user, password = self._get_prometheus_credentials()
2996 return {'user': user,
2997 'password': password,
1e59de90
TL
2998 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
2999
3000 @handle_orch_error
3001 def get_alertmanager_access_info(self) -> Dict[str, str]:
aee94f69
TL
3002 user, password = self._get_alertmanager_credentials()
3003 return {'user': user,
3004 'password': password,
1e59de90
TL
3005 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
3006
f67539c2 3007 @handle_orch_error
adb31ebb 3008 def apply_mon(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3009 return self._apply(spec)
3010
e306af50
TL
3011 def _apply(self, spec: GenericSpec) -> str:
3012 if spec.service_type == 'host':
3013 return self._add_host(cast(HostSpec, spec))
9f95a23c 3014
f6b5b4d7
TL
3015 if spec.service_type == 'osd':
3016 # _trigger preview refresh needs to be smart and
3017 # should only refresh if a change has been detected
3018 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
3019
e306af50 3020 return self._apply_service_spec(cast(ServiceSpec, spec))
9f95a23c 3021
39ae355f
TL
3022 def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]:
3023 """Return a list of candidate hosts according to the placement specification."""
3024 all_hosts = self.cache.get_schedulable_hosts()
39ae355f
TL
3025 candidates = []
3026 if placement.hosts:
3027 candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts]
3028 elif placement.label:
3029 candidates = [x.hostname for x in [h for h in all_hosts if placement.label in h.labels]]
3030 elif placement.host_pattern:
3031 candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)]
3032 elif (placement.count is not None or placement.count_per_host is not None):
3033 candidates = [x.hostname for x in all_hosts]
aee94f69 3034 return [h for h in candidates if not self.cache.is_host_draining(h)]
39ae355f
TL
3035
3036 def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None:
3037 """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
3038 if spec.count is not None:
3039 raise OrchestratorError(
3040 "Placement 'count' field is no supported for this specification.")
3041 if spec.count_per_host is not None:
3042 raise OrchestratorError(
3043 "Placement 'count_per_host' field is no supported for this specification.")
3044 if spec.hosts:
3045 all_hosts = [h.hostname for h in self.inventory.all_specs()]
3046 invalid_hosts = [h.hostname for h in spec.hosts if h.hostname not in all_hosts]
3047 if invalid_hosts:
3048 raise OrchestratorError(f"Found invalid host(s) in placement section: {invalid_hosts}. "
3049 f"Please check 'ceph orch host ls' for available hosts.")
3050 elif not self._get_candidate_hosts(spec):
3051 raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n"
3052 "Please check 'ceph orch host ls' for available hosts.\n"
3053 "Note: draining hosts are excluded from the candidate list.")
3054
3055 def _validate_tunedprofile_settings(self, spec: TunedProfileSpec) -> Dict[str, List[str]]:
3056 candidate_hosts = spec.placement.filter_matching_hostspecs(self.inventory.all_specs())
3057 invalid_options: Dict[str, List[str]] = {}
3058 for host in candidate_hosts:
3059 host_sysctl_options = self.cache.get_facts(host).get('sysctl_options', {})
3060 invalid_options[host] = []
3061 for option in spec.settings:
3062 if option not in host_sysctl_options:
3063 invalid_options[host].append(option)
3064 return invalid_options
3065
3066 def _validate_tuned_profile_spec(self, spec: TunedProfileSpec) -> None:
3067 if not spec.settings:
3068 raise OrchestratorError("Invalid spec: settings section cannot be empty.")
3069 self._validate_one_shot_placement_spec(spec.placement)
3070 invalid_options = self._validate_tunedprofile_settings(spec)
3071 if any(e for e in invalid_options.values()):
3072 raise OrchestratorError(
3073 f'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}')
3074
2a845540
TL
3075 @handle_orch_error
3076 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str:
3077 outs = []
3078 for spec in specs:
39ae355f 3079 self._validate_tuned_profile_spec(spec)
2a845540 3080 if no_overwrite and self.tuned_profiles.exists(spec.profile_name):
39ae355f
TL
3081 outs.append(
3082 f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2a845540 3083 else:
39ae355f 3084 # done, let's save the specs
2a845540
TL
3085 self.tuned_profiles.add_profile(spec)
3086 outs.append(f'Saved tuned profile {spec.profile_name}')
3087 self._kick_serve_loop()
3088 return '\n'.join(outs)
3089
3090 @handle_orch_error
3091 def rm_tuned_profile(self, profile_name: str) -> str:
3092 if profile_name not in self.tuned_profiles:
3093 raise OrchestratorError(
3094 f'Tuned profile {profile_name} does not exist. Nothing to remove.')
3095 self.tuned_profiles.rm_profile(profile_name)
3096 self._kick_serve_loop()
3097 return f'Removed tuned profile {profile_name}'
3098
3099 @handle_orch_error
3100 def tuned_profile_ls(self) -> List[TunedProfileSpec]:
3101 return self.tuned_profiles.list_profiles()
3102
3103 @handle_orch_error
3104 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str:
3105 if profile_name not in self.tuned_profiles:
3106 raise OrchestratorError(
3107 f'Tuned profile {profile_name} does not exist. Cannot add setting.')
3108 self.tuned_profiles.add_setting(profile_name, setting, value)
3109 self._kick_serve_loop()
3110 return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
3111
3112 @handle_orch_error
3113 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
3114 if profile_name not in self.tuned_profiles:
3115 raise OrchestratorError(
3116 f'Tuned profile {profile_name} does not exist. Cannot remove setting.')
3117 self.tuned_profiles.rm_setting(profile_name, setting)
3118 self._kick_serve_loop()
3119 return f'Removed setting {setting} from tuned profile {profile_name}'
3120
1e59de90
TL
3121 @handle_orch_error
3122 def service_discovery_dump_cert(self) -> str:
3123 root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT)
3124 if not root_cert:
3125 raise OrchestratorError('No certificate found for service discovery')
3126 return root_cert
3127
a4b75251
TL
3128 def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
3129 self.health_checks[name] = {
3130 'severity': 'warning',
3131 'summary': summary,
3132 'count': count,
3133 'detail': detail,
3134 }
3135 self.set_health_checks(self.health_checks)
3136
3137 def remove_health_warning(self, name: str) -> None:
3138 if name in self.health_checks:
3139 del self.health_checks[name]
3140 self.set_health_checks(self.health_checks)
3141
adb31ebb 3142 def _plan(self, spec: ServiceSpec) -> dict:
f6b5b4d7
TL
3143 if spec.service_type == 'osd':
3144 return {'service_name': spec.service_name(),
3145 'service_type': spec.service_type,
3146 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
3147
b3b6e05e 3148 svc = self.cephadm_services[spec.service_type]
f6b5b4d7
TL
3149 ha = HostAssignment(
3150 spec=spec,
20effc67
TL
3151 hosts=self.cache.get_schedulable_hosts(),
3152 unreachable_hosts=self.cache.get_unreachable_hosts(),
2a845540 3153 draining_hosts=self.cache.get_draining_hosts(),
f67539c2
TL
3154 networks=self.cache.networks,
3155 daemons=self.cache.get_daemons_by_service(spec.service_name()),
b3b6e05e
TL
3156 allow_colo=svc.allow_colo(),
3157 rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
f6b5b4d7
TL
3158 )
3159 ha.validate()
f67539c2 3160 hosts, to_add, to_remove = ha.place()
f6b5b4d7
TL
3161
3162 return {
3163 'service_name': spec.service_name(),
3164 'service_type': spec.service_type,
f67539c2 3165 'add': [hs.hostname for hs in to_add],
b3b6e05e 3166 'remove': [d.name() for d in to_remove]
f6b5b4d7
TL
3167 }
3168
f67539c2
TL
3169 @handle_orch_error
3170 def plan(self, specs: Sequence[GenericSpec]) -> List:
f6b5b4d7 3171 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
20effc67 3172 'to the current inventory setup. If any of these conditions change, the \n'
f6b5b4d7
TL
3173 'preview will be invalid. Please make sure to have a minimal \n'
3174 'timeframe between planning and applying the specs.'}]
3175 if any([spec.service_type == 'host' for spec in specs]):
3176 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
3177 for spec in specs:
3178 results.append(self._plan(cast(ServiceSpec, spec)))
3179 return results
3180
e306af50 3181 def _apply_service_spec(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3182 if spec.placement.is_empty():
3183 # fill in default placement
3184 defaults = {
3185 'mon': PlacementSpec(count=5),
3186 'mgr': PlacementSpec(count=2),
3187 'mds': PlacementSpec(count=2),
3188 'rgw': PlacementSpec(count=2),
f67539c2 3189 'ingress': PlacementSpec(count=2),
1911f103 3190 'iscsi': PlacementSpec(count=1),
aee94f69 3191 'nvmeof': PlacementSpec(count=1),
9f95a23c 3192 'rbd-mirror': PlacementSpec(count=2),
f67539c2 3193 'cephfs-mirror': PlacementSpec(count=1),
801d1391 3194 'nfs': PlacementSpec(count=1),
9f95a23c
TL
3195 'grafana': PlacementSpec(count=1),
3196 'alertmanager': PlacementSpec(count=1),
3197 'prometheus': PlacementSpec(count=1),
3198 'node-exporter': PlacementSpec(host_pattern='*'),
39ae355f 3199 'ceph-exporter': PlacementSpec(host_pattern='*'),
33c7a0ef
TL
3200 'loki': PlacementSpec(count=1),
3201 'promtail': PlacementSpec(host_pattern='*'),
9f95a23c 3202 'crash': PlacementSpec(host_pattern='*'),
f91f0fd5 3203 'container': PlacementSpec(count=1),
20effc67 3204 'snmp-gateway': PlacementSpec(count=1),
1e59de90
TL
3205 'elasticsearch': PlacementSpec(count=1),
3206 'jaeger-agent': PlacementSpec(host_pattern='*'),
3207 'jaeger-collector': PlacementSpec(count=1),
3208 'jaeger-query': PlacementSpec(count=1)
9f95a23c
TL
3209 }
3210 spec.placement = defaults[spec.service_type]
3211 elif spec.service_type in ['mon', 'mgr'] and \
f91f0fd5
TL
3212 spec.placement.count is not None and \
3213 spec.placement.count < 1:
9f95a23c
TL
3214 raise OrchestratorError('cannot scale %s service below 1' % (
3215 spec.service_type))
3216
f67539c2
TL
3217 host_count = len(self.inventory.keys())
3218 max_count = self.max_count_per_host
3219
3220 if spec.placement.count is not None:
3221 if spec.service_type in ['mon', 'mgr']:
3222 if spec.placement.count > max(5, host_count):
3223 raise OrchestratorError(
3224 (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
3225 elif spec.service_type != 'osd':
3226 if spec.placement.count > (max_count * host_count):
3227 raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
3228 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3229
3230 if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd':
3231 raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.'
3232 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3233
9f95a23c
TL
3234 HostAssignment(
3235 spec=spec,
f91f0fd5 3236 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
20effc67 3237 unreachable_hosts=self.cache.get_unreachable_hosts(),
2a845540 3238 draining_hosts=self.cache.get_draining_hosts(),
f67539c2
TL
3239 networks=self.cache.networks,
3240 daemons=self.cache.get_daemons_by_service(spec.service_name()),
3241 allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
9f95a23c
TL
3242 ).validate()
3243
3244 self.log.info('Saving service %s spec with placement %s' % (
3245 spec.service_name(), spec.placement.pretty_str()))
3246 self.spec_store.save(spec)
3247 self._kick_serve_loop()
1911f103 3248 return "Scheduled %s update..." % spec.service_name()
9f95a23c 3249
f67539c2
TL
3250 @handle_orch_error
3251 def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
e306af50
TL
3252 results = []
3253 for spec in specs:
f67539c2
TL
3254 if no_overwrite:
3255 if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory:
3256 results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
3257 % (cast(HostSpec, spec).hostname, spec.service_type))
3258 continue
3259 elif cast(ServiceSpec, spec).service_name() in self.spec_store:
3260 results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
3261 % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
3262 continue
e306af50
TL
3263 results.append(self._apply(spec))
3264 return results
9f95a23c 3265
f67539c2 3266 @handle_orch_error
adb31ebb 3267 def apply_mgr(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3268 return self._apply(spec)
3269
f67539c2 3270 @handle_orch_error
f6b5b4d7 3271 def apply_mds(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3272 return self._apply(spec)
3273
f67539c2 3274 @handle_orch_error
adb31ebb 3275 def apply_rgw(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3276 return self._apply(spec)
3277
f67539c2
TL
3278 @handle_orch_error
3279 def apply_ingress(self, spec: ServiceSpec) -> str:
3280 return self._apply(spec)
1911f103 3281
f67539c2 3282 @handle_orch_error
adb31ebb 3283 def apply_iscsi(self, spec: ServiceSpec) -> str:
1911f103
TL
3284 return self._apply(spec)
3285
f67539c2 3286 @handle_orch_error
adb31ebb 3287 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3288 return self._apply(spec)
3289
f67539c2 3290 @handle_orch_error
adb31ebb 3291 def apply_nfs(self, spec: ServiceSpec) -> str:
801d1391
TL
3292 return self._apply(spec)
3293
9f95a23c
TL
3294 def _get_dashboard_url(self):
3295 # type: () -> str
3296 return self.get('mgr_map').get('services', {}).get('dashboard', '')
3297
f67539c2 3298 @handle_orch_error
adb31ebb 3299 def apply_prometheus(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3300 return self._apply(spec)
3301
33c7a0ef
TL
3302 @handle_orch_error
3303 def apply_loki(self, spec: ServiceSpec) -> str:
3304 return self._apply(spec)
3305
3306 @handle_orch_error
3307 def apply_promtail(self, spec: ServiceSpec) -> str:
3308 return self._apply(spec)
3309
f67539c2 3310 @handle_orch_error
adb31ebb 3311 def apply_node_exporter(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3312 return self._apply(spec)
3313
39ae355f
TL
3314 @handle_orch_error
3315 def apply_ceph_exporter(self, spec: ServiceSpec) -> str:
3316 return self._apply(spec)
3317
f67539c2 3318 @handle_orch_error
adb31ebb 3319 def apply_crash(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3320 return self._apply(spec)
3321
f67539c2 3322 @handle_orch_error
f6b5b4d7 3323 def apply_grafana(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3324 return self._apply(spec)
3325
f67539c2 3326 @handle_orch_error
f6b5b4d7 3327 def apply_alertmanager(self, spec: ServiceSpec) -> str:
9f95a23c
TL
3328 return self._apply(spec)
3329
f67539c2 3330 @handle_orch_error
f91f0fd5
TL
3331 def apply_container(self, spec: ServiceSpec) -> str:
3332 return self._apply(spec)
3333
f67539c2 3334 @handle_orch_error
20effc67 3335 def apply_snmp_gateway(self, spec: ServiceSpec) -> str:
f67539c2
TL
3336 return self._apply(spec)
3337
1e59de90
TL
3338 @handle_orch_error
3339 def set_unmanaged(self, service_name: str, value: bool) -> str:
3340 return self.spec_store.set_unmanaged(service_name, value)
3341
f67539c2 3342 @handle_orch_error
adb31ebb 3343 def upgrade_check(self, image: str, version: str) -> str:
f67539c2
TL
3344 if self.inventory.get_host_with_state("maintenance"):
3345 raise OrchestratorError("check aborted - you have hosts in maintenance state")
3346
9f95a23c
TL
3347 if version:
3348 target_name = self.container_image_base + ':v' + version
3349 elif image:
3350 target_name = image
3351 else:
3352 raise OrchestratorError('must specify either image or version')
3353
1e59de90
TL
3354 with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'):
3355 image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
b3b6e05e
TL
3356
3357 ceph_image_version = image_info.ceph_version
3358 if not ceph_image_version:
3359 return f'Unable to extract ceph version from {target_name}.'
3360 if ceph_image_version.startswith('ceph version '):
3361 ceph_image_version = ceph_image_version.split(' ')[2]
3362 version_error = self.upgrade._check_target_version(ceph_image_version)
3363 if version_error:
3364 return f'Incompatible upgrade: {version_error}'
3365
f91f0fd5 3366 self.log.debug(f'image info {image} -> {image_info}')
adb31ebb 3367 r: dict = {
9f95a23c 3368 'target_name': target_name,
f91f0fd5
TL
3369 'target_id': image_info.image_id,
3370 'target_version': image_info.ceph_version,
9f95a23c
TL
3371 'needs_update': dict(),
3372 'up_to_date': list(),
f67539c2 3373 'non_ceph_image_daemons': list()
9f95a23c
TL
3374 }
3375 for host, dm in self.cache.daemons.items():
3376 for name, dd in dm.items():
1e59de90
TL
3377 # check if the container digest for the digest we're checking upgrades for matches
3378 # the container digests for the daemon if "use_repo_digest" setting is true
3379 # or that the image name matches the daemon's image name if "use_repo_digest"
3380 # is false. The idea is to generally check if the daemon is already using
3381 # the image we're checking upgrade to.
3382 if (
3383 (self.use_repo_digest and dd.matches_digests(image_info.repo_digests))
3384 or (not self.use_repo_digest and dd.matches_image_name(image))
3385 ):
9f95a23c 3386 r['up_to_date'].append(dd.name())
a4b75251 3387 elif dd.daemon_type in CEPH_IMAGE_TYPES:
9f95a23c
TL
3388 r['needs_update'][dd.name()] = {
3389 'current_name': dd.container_image_name,
3390 'current_id': dd.container_image_id,
3391 'current_version': dd.version,
3392 }
f67539c2
TL
3393 else:
3394 r['non_ceph_image_daemons'].append(dd.name())
3395 if self.use_repo_digest and image_info.repo_digests:
3396 # FIXME: we assume the first digest is the best one to use
3397 r['target_digest'] = image_info.repo_digests[0]
f91f0fd5 3398
9f95a23c
TL
3399 return json.dumps(r, indent=4, sort_keys=True)
3400
f67539c2 3401 @handle_orch_error
f6b5b4d7 3402 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
e306af50 3403 return self.upgrade.upgrade_status()
9f95a23c 3404
a4b75251 3405 @handle_orch_error
33c7a0ef
TL
3406 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
3407 return self.upgrade.upgrade_ls(image, tags, show_all_versions)
a4b75251 3408
f67539c2 3409 @handle_orch_error
33c7a0ef
TL
3410 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
3411 services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
f67539c2 3412 if self.inventory.get_host_with_state("maintenance"):
39ae355f
TL
3413 raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
3414 if self.offline_hosts:
aee94f69
TL
3415 raise OrchestratorError(
3416 f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
33c7a0ef
TL
3417 if daemon_types is not None and services is not None:
3418 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
3419 if daemon_types is not None:
3420 for dtype in daemon_types:
3421 if dtype not in CEPH_UPGRADE_ORDER:
3422 raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
3423 f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
3424 if services is not None:
3425 for service in services:
3426 if service not in self.spec_store:
3427 raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
3428 f'Known services are: {self.spec_store.all_specs.keys()}')
3429 hosts: Optional[List[str]] = None
3430 if host_placement is not None:
3431 all_hosts = list(self.inventory.all_specs())
3432 placement = PlacementSpec.from_string(host_placement)
3433 hosts = placement.filter_matching_hostspecs(all_hosts)
3434 if not hosts:
3435 raise OrchestratorError(
3436 f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
3437
3438 if limit is not None:
3439 if limit < 1:
3440 raise OrchestratorError(
3441 f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
3442
3443 return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
9f95a23c 3444
f67539c2 3445 @handle_orch_error
f6b5b4d7 3446 def upgrade_pause(self) -> str:
e306af50 3447 return self.upgrade.upgrade_pause()
9f95a23c 3448
f67539c2 3449 @handle_orch_error
f6b5b4d7 3450 def upgrade_resume(self) -> str:
e306af50 3451 return self.upgrade.upgrade_resume()
9f95a23c 3452
f67539c2 3453 @handle_orch_error
f6b5b4d7 3454 def upgrade_stop(self) -> str:
e306af50 3455 return self.upgrade.upgrade_stop()
9f95a23c 3456
f67539c2 3457 @handle_orch_error
9f95a23c
TL
3458 def remove_osds(self, osd_ids: List[str],
3459 replace: bool = False,
a4b75251 3460 force: bool = False,
1e59de90
TL
3461 zap: bool = False,
3462 no_destroy: bool = False) -> str:
9f95a23c
TL
3463 """
3464 Takes a list of OSDs and schedules them for removal.
3465 The function that takes care of the actual removal is
f6b5b4d7 3466 process_removal_queue().
9f95a23c
TL
3467 """
3468
f6b5b4d7
TL
3469 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
3470 to_remove_daemons = list()
9f95a23c 3471 for daemon in daemons:
f6b5b4d7
TL
3472 if daemon.daemon_id in osd_ids:
3473 to_remove_daemons.append(daemon)
3474 if not to_remove_daemons:
3475 return f"Unable to find OSDs: {osd_ids}"
9f95a23c 3476
f6b5b4d7 3477 for daemon in to_remove_daemons:
f67539c2 3478 assert daemon.daemon_id is not None
f6b5b4d7
TL
3479 try:
3480 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
3481 replace=replace,
3482 force=force,
a4b75251 3483 zap=zap,
1e59de90 3484 no_destroy=no_destroy,
f6b5b4d7 3485 hostname=daemon.hostname,
adb31ebb
TL
3486 process_started_at=datetime_now(),
3487 remove_util=self.to_remove_osds.rm_util))
f6b5b4d7
TL
3488 except NotFoundError:
3489 return f"Unable to find OSDs: {osd_ids}"
9f95a23c
TL
3490
3491 # trigger the serve loop to initiate the removal
3492 self._kick_serve_loop()
2a845540
TL
3493 warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
3494 "Run the `ceph-volume lvm zap` command with `--destroy`"
3495 " against the VG/LV if you want them to be destroyed.")
3496 return f"Scheduled OSD(s) for removal.{warning_zap}"
9f95a23c 3497
f67539c2 3498 @handle_orch_error
adb31ebb 3499 def stop_remove_osds(self, osd_ids: List[str]) -> str:
f6b5b4d7
TL
3500 """
3501 Stops a `removal` process for a List of OSDs.
3502 This will revert their weight and remove it from the osds_to_remove queue
3503 """
3504 for osd_id in osd_ids:
3505 try:
f38dd50b 3506 self.to_remove_osds.rm_by_osd_id(int(osd_id))
f67539c2 3507 except (NotFoundError, KeyError, ValueError):
f6b5b4d7
TL
3508 return f'Unable to find OSD in the queue: {osd_id}'
3509
3510 # trigger the serve loop to halt the removal
3511 self._kick_serve_loop()
3512 return "Stopped OSD(s) removal"
3513
f67539c2 3514 @handle_orch_error
adb31ebb 3515 def remove_osds_status(self) -> List[OSD]:
9f95a23c
TL
3516 """
3517 The CLI call to retrieve an osd removal report
3518 """
f6b5b4d7 3519 return self.to_remove_osds.all_osds()
522d829b
TL
3520
3521 @handle_orch_error
aee94f69 3522 def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str:
522d829b
TL
3523 """
3524 Drain all daemons from a host.
3525 :param host: host name
3526 """
33c7a0ef
TL
3527
3528 # if we drain the last admin host we could end up removing the only instance
3529 # of the config and keyring and cause issues
3530 if not force:
aee94f69 3531 p = PlacementSpec(label=SpecialHostLabels.ADMIN)
33c7a0ef
TL
3532 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
3533 if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
aee94f69 3534 raise OrchestratorValidationError(f"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'"
33c7a0ef
TL
3535 " label.\nDraining this host could cause the removal"
3536 " of the last cluster config/keyring managed by cephadm.\n"
aee94f69 3537 f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
33c7a0ef
TL
3538 " before completing this operation.\nIf you're certain this is"
3539 " what you want rerun this command with --force.")
f38dd50b
TL
3540 # if the user has specified the host we are going to drain
3541 # explicitly in any service spec, warn the user. Having a
3542 # drained host listed in a placement provides no value, so
3543 # they may want to fix it.
3544 services_matching_drain_host: List[str] = []
3545 for sname, sspec in self.spec_store.all_specs.items():
3546 if sspec.placement.hosts and hostname in [h.hostname for h in sspec.placement.hosts]:
3547 services_matching_drain_host.append(sname)
3548 if services_matching_drain_host:
3549 raise OrchestratorValidationError(f'Host {hostname} was found explicitly listed in the placements '
3550 f'of services:\n {services_matching_drain_host}.\nPlease update those '
3551 'specs to not list this host.\nThis warning can be bypassed with --force')
33c7a0ef 3552
522d829b 3553 self.add_host_label(hostname, '_no_schedule')
aee94f69
TL
3554 if not keep_conf_keyring:
3555 self.add_host_label(hostname, SpecialHostLabels.DRAIN_CONF_KEYRING)
522d829b
TL
3556
3557 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
3558
3559 osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
aee94f69 3560 self.remove_osds(osds_to_remove, zap=zap_osd_devices)
522d829b
TL
3561
3562 daemons_table = ""
3563 daemons_table += "{:<20} {:<15}\n".format("type", "id")
3564 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
3565 for d in daemons:
3566 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
3567
3568 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table)
3569
3570 def trigger_connect_dashboard_rgw(self) -> None:
3571 self.need_connect_dashboard_rgw = True
3572 self.event.set()