import logging
import uuid
from collections import defaultdict
-from contextlib import contextmanager
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
-
-from cephadm import remotes
-
-try:
- import remoto
- import execnet.gateway_bootstrap
-except ImportError:
- remoto = None
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
+ DefaultDict
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
-from ceph.utils import str_to_datetime, datetime_now
+from ceph.utils import datetime_now
import orchestrator
from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
- from remoto.backends import BaseConnection
logger = logging.getLogger(__name__)
self.mgr.config_checker.load_network_config()
while self.mgr.run:
+ self.log.debug("serve loop start")
try:
self._purge_deleted_services()
+ self._check_for_moved_osds()
+
+ if self.mgr.agent_helpers._handle_use_agent_setting():
+ continue
+
if self.mgr.upgrade.continue_upgrade():
continue
if e.event_subject:
self.mgr.events.from_orch_error(e)
+ self.log.debug("serve loop sleep")
self._serve_sleep()
+ self.log.debug("serve loop wake")
self.log.debug("serve exit")
def _serve_sleep(self) -> None:
self.mgr.event.clear()
def _update_paused_health(self) -> None:
+ self.log.debug('_update_paused_health')
if self.mgr.paused:
- self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, ["'ceph orch resume' to resume"])
+ self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
+ "'ceph orch resume' to resume"])
else:
self.mgr.remove_health_warning('CEPHADM_PAUSED')
self.mgr.cache.update_autotune(host)
def _refresh_hosts_and_daemons(self) -> None:
+ self.log.debug('_refresh_hosts_and_daemons')
bad_hosts = []
failures = []
- # host -> path -> (mode, uid, gid, content, digest)
- client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
-
- # ceph.conf
if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
- config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
- config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
+ client_files = self._calc_client_files()
+ else:
+ client_files = {}
- if self.mgr.manage_etc_ceph_ceph_conf:
- try:
- pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
- ha = HostAssignment(
- spec=ServiceSpec('mon', placement=pspec),
- hosts=self.mgr._schedulable_hosts(),
- unreachable_hosts=self.mgr._unreachable_hosts(),
- daemons=[],
- networks=self.mgr.cache.networks,
- )
- all_slots, _, _ = ha.place()
- for host in {s.hostname for s in all_slots}:
- if host not in client_files:
- client_files[host] = {}
- client_files[host]['/etc/ceph/ceph.conf'] = (
- 0o644, 0, 0, bytes(config), str(config_digest)
- )
- except Exception as e:
- self.mgr.log.warning(
- f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
-
- # client keyrings
- for ks in self.mgr.keys.keys.values():
- assert config
- assert config_digest
- try:
- ret, keyring, err = self.mgr.mon_command({
- 'prefix': 'auth get',
- 'entity': ks.entity,
- })
- if ret:
- self.log.warning(f'unable to fetch keyring for {ks.entity}')
- continue
- digest = ''.join('%02x' % c for c in hashlib.sha256(
- keyring.encode('utf-8')).digest())
- ha = HostAssignment(
- spec=ServiceSpec('mon', placement=ks.placement),
- hosts=self.mgr._schedulable_hosts(),
- unreachable_hosts=self.mgr._unreachable_hosts(),
- daemons=[],
- networks=self.mgr.cache.networks,
- )
- all_slots, _, _ = ha.place()
- for host in {s.hostname for s in all_slots}:
- if host not in client_files:
- client_files[host] = {}
- client_files[host]['/etc/ceph/ceph.conf'] = (
- 0o644, 0, 0, bytes(config), str(config_digest)
- )
- client_files[host][ks.path] = (
- ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
- )
- except Exception as e:
- self.log.warning(
- f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
+ agents_down: List[str] = []
@forall_hosts
def refresh(host: str) -> None:
if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
return
+ if self.mgr.use_agent:
+ if self.mgr.agent_helpers._check_agent(host):
+ agents_down.append(host)
+
if self.mgr.cache.host_needs_check(host):
r = self._check_host(host)
if r is not None:
bad_hosts.append(r)
- if self.mgr.cache.host_needs_daemon_refresh(host):
- self.log.debug('refreshing %s daemons' % host)
- r = self._refresh_host_daemons(host)
- if r:
- failures.append(r)
- if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
+ if (
+ not self.mgr.use_agent
+ or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
+ or host in agents_down
+ ):
+ if self.mgr.cache.host_needs_daemon_refresh(host):
+ self.log.debug('refreshing %s daemons' % host)
+ r = self._refresh_host_daemons(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_facts_refresh(host):
+ self.log.debug(('Refreshing %s facts' % host))
+ r = self._refresh_facts(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_network_refresh(host):
+ self.log.debug(('Refreshing %s networks' % host))
+ r = self._refresh_host_networks(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_device_refresh(host):
+ self.log.debug('refreshing %s devices' % host)
+ r = self._refresh_host_devices(host)
+ if r:
+ failures.append(r)
+ self.mgr.cache.metadata_up_to_date[host] = True
+ elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
+ if self.mgr.cache.host_needs_daemon_refresh(host):
+ self.log.debug('refreshing %s daemons' % host)
+ r = self._refresh_host_daemons(host)
+ if r:
+ failures.append(r)
+ self.mgr.cache.metadata_up_to_date[host] = True
+
+ if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
self.log.debug(f"Logging `{host}` into custom registry")
- r = self._registry_login(host, self.mgr.registry_url,
- self.mgr.registry_username, self.mgr.registry_password)
+ r = self.mgr.wait_async(self._registry_login(
+ host, json.loads(str(self.mgr.get_store('registry_credentials')))))
if r:
bad_hosts.append(r)
- if self.mgr.cache.host_needs_device_refresh(host):
- self.log.debug('refreshing %s devices' % host)
- r = self._refresh_host_devices(host)
- if r:
- failures.append(r)
-
- if self.mgr.cache.host_needs_facts_refresh(host):
- self.log.debug(('Refreshing %s facts' % host))
- r = self._refresh_facts(host)
- if r:
- failures.append(r)
-
if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
self.log.debug(f"refreshing OSDSpec previews for {host}")
r = self._refresh_host_osdspec_previews(host)
self.log.debug(f"autotuning memory for {host}")
self._autotune_host_memory(host)
- # client files
- updated_files = False
- old_files = self.mgr.cache.get_host_client_files(host).copy()
- for path, m in client_files.get(host, {}).items():
- mode, uid, gid, content, digest = m
- if path in old_files:
- match = old_files[path] == (digest, mode, uid, gid)
- del old_files[path]
- if match:
- continue
- self.log.info(f'Updating {host}:{path}')
- self._write_remote_file(host, path, content, mode, uid, gid)
- self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
- updated_files = True
- for path in old_files.keys():
- self.log.info(f'Removing {host}:{path}')
- with self._remote_connection(host) as tpl:
- conn, connr = tpl
- out, err, code = remoto.process.check(
- conn,
- ['rm', '-f', path])
- updated_files = True
- self.mgr.cache.removed_client_file(host, path)
- if updated_files:
- self.mgr.cache.save_host(host)
+ self._write_client_files(client_files, host)
refresh(self.mgr.cache.get_hosts())
+ self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
+
self.mgr.config_checker.run_checks()
for k in [
'CEPHADM_HOST_CHECK_FAILED',
- 'CEPHADM_FAILED_DAEMON',
'CEPHADM_REFRESH_FAILED',
]:
self.mgr.remove_health_warning(k)
if bad_hosts:
- self.mgr.set_health_warning('CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
+ self.mgr.set_health_warning(
+ 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
if failures:
- self.mgr.set_health_warning('CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
- failed_daemons = []
- for dd in self.mgr.cache.get_daemons():
- if dd.status is not None and dd.status == DaemonDescriptionStatus.error:
- failed_daemons.append('daemon %s on %s is in %s state' % (
- dd.name(), dd.hostname, dd.status_desc
- ))
- if failed_daemons:
- self.mgr.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(failed_daemons), failed_daemons)
+ self.mgr.set_health_warning(
+ 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
+ self.mgr.update_failed_daemon_health_check()
def _check_host(self, host: str) -> Optional[str]:
if host not in self.mgr.inventory:
self.log.debug(' checking %s' % host)
try:
addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
- out, err, code = self._run_cephadm(
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
host, cephadmNoImage, 'check-host', [],
- error_ok=True, no_fsid=True)
+ error_ok=True, no_fsid=True))
self.mgr.cache.update_last_host_check(host)
self.mgr.cache.save_host(host)
if code:
def _refresh_host_daemons(self, host: str) -> Optional[str]:
try:
- ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
+ ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
except OrchestratorError as e:
return str(e)
- dm = {}
- for d in ls:
- if not d['style'].startswith('cephadm'):
- continue
- if d['fsid'] != self.mgr._cluster_fsid:
- continue
- if '.' not in d['name']:
- continue
- sd = orchestrator.DaemonDescription()
- sd.last_refresh = datetime_now()
- for k in ['created', 'started', 'last_configured', 'last_deployed']:
- v = d.get(k, None)
- if v:
- setattr(sd, k, str_to_datetime(d[k]))
- sd.daemon_type = d['name'].split('.')[0]
- if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
- logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
- continue
-
- sd.daemon_id = '.'.join(d['name'].split('.')[1:])
- sd.hostname = host
- sd.container_id = d.get('container_id')
- if sd.container_id:
- # shorten the hash
- sd.container_id = sd.container_id[0:12]
- sd.container_image_name = d.get('container_image_name')
- sd.container_image_id = d.get('container_image_id')
- sd.container_image_digests = d.get('container_image_digests')
- sd.memory_usage = d.get('memory_usage')
- sd.memory_request = d.get('memory_request')
- sd.memory_limit = d.get('memory_limit')
- sd._service_name = d.get('service_name')
- sd.deployed_by = d.get('deployed_by')
- sd.version = d.get('version')
- sd.ports = d.get('ports')
- sd.ip = d.get('ip')
- sd.rank = int(d['rank']) if d.get('rank') is not None else None
- sd.rank_generation = int(d['rank_generation']) if d.get(
- 'rank_generation') is not None else None
- if sd.daemon_type == 'osd':
- sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
- if 'state' in d:
- sd.status_desc = d['state']
- sd.status = {
- 'running': DaemonDescriptionStatus.running,
- 'stopped': DaemonDescriptionStatus.stopped,
- 'error': DaemonDescriptionStatus.error,
- 'unknown': DaemonDescriptionStatus.error,
- }[d['state']]
- else:
- sd.status_desc = 'unknown'
- sd.status = None
- dm[sd.name()] = sd
- self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
- self.mgr.cache.update_host_daemons(host, dm)
- self.mgr.cache.save_host(host)
+ self.mgr._process_ls_output(host, ls)
return None
def _refresh_facts(self, host: str) -> Optional[str]:
try:
- val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
+ val = self.mgr.wait_async(self._run_cephadm_json(
+ host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
except OrchestratorError as e:
return str(e)
return None
def _refresh_host_devices(self, host: str) -> Optional[str]:
-
- with_lsm = self.mgr.get_module_option('device_enhanced_scan')
+ with_lsm = self.mgr.device_enhanced_scan
inventory_args = ['--', 'inventory',
'--format=json-pretty',
'--filter-for-batch']
try:
try:
- devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
- inventory_args)
+ devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+ inventory_args))
except OrchestratorError as e:
if 'unrecognized arguments: --filter-for-batch' in str(e):
rerun_args = inventory_args.copy()
rerun_args.remove('--filter-for-batch')
- devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
- rerun_args)
+ devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+ rerun_args))
else:
raise
- networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
except OrchestratorError as e:
return str(e)
- self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
- host, len(devices), len(networks)))
+ self.log.debug('Refreshed host %s devices (%d)' % (
+ host, len(devices)))
ret = inventory.Devices.from_json(devices)
- self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
+ self.mgr.cache.update_host_devices(host, ret.devices)
self.update_osdspec_previews(host)
self.mgr.cache.save_host(host)
return None
+ def _refresh_host_networks(self, host: str) -> Optional[str]:
+ try:
+ networks = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'mon', 'list-networks', [], no_fsid=True))
+ except OrchestratorError as e:
+ return str(e)
+
+ self.log.debug('Refreshed host %s networks (%s)' % (
+ host, len(networks)))
+ self.mgr.cache.update_host_networks(host, networks)
+ self.mgr.cache.save_host(host)
+ return None
+
def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
self.update_osdspec_previews(host)
self.mgr.cache.save_host(host)
s.get('type'), s.get('id')
)
)
-
+ if s.get('type') == 'tcmu-runner':
+ # because we don't track tcmu-runner daemons in the host cache
+ # and don't have a way to check if the daemon is part of iscsi service
+ # we assume that all tcmu-runner daemons are managed by cephadm
+ managed.append(name)
if host not in self.mgr.inventory:
missing_names.append(name)
host_num_daemons += 1
'stray host %s has %d stray daemons: %s' % (
host, len(missing_names), missing_names))
if self.mgr.warn_on_stray_hosts and host_detail:
- self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
+ self.mgr.set_health_warning(
+ 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
if self.mgr.warn_on_stray_daemons and daemon_detail:
- self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+ self.mgr.set_health_warning(
+ 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+
+ def _check_for_moved_osds(self) -> None:
+ self.log.debug('_check_for_moved_osds')
+ all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
+ for dd in self.mgr.cache.get_daemons_by_type('osd'):
+ assert dd.daemon_id
+ all_osds[int(dd.daemon_id)].append(dd)
+ for osd_id, dds in all_osds.items():
+ if len(dds) <= 1:
+ continue
+ running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
+ error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
+ msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
+ logger.info(msg)
+ if len(running) != 1:
+ continue
+ osd = self.mgr.get_osd_by_id(osd_id)
+ if not osd or not osd['up']:
+ continue
+ for e in error:
+ assert e.hostname
+ try:
+ self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
+ self.mgr.events.for_daemon(
+ e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
+ except OrchestratorError as ex:
+ self.mgr.events.from_orch_error(ex)
+ logger.exception(f'failed to remove duplicated daemon {e}')
def _apply_all_services(self) -> bool:
+ self.log.debug('_apply_all_services')
r = False
specs = [] # type: List[ServiceSpec]
- for sn, spec in self.mgr.spec_store.active_specs.items():
- specs.append(spec)
+ # if metadata is not up to date, we still need to apply spec for agent
+ # since the agent is the one who gather the metadata. If we don't we
+ # end up stuck between wanting metadata to be up to date to apply specs
+ # and needing to apply the agent spec to get up to date metadata
+ if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+ self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
+ try:
+ specs.append(self.mgr.spec_store['agent'].spec)
+ except Exception as e:
+ self.log.debug(f'Failed to find agent spec: {e}')
+ self.mgr.agent_helpers._apply_agent()
+ return r
+ else:
+ for sn, spec in self.mgr.spec_store.active_specs.items():
+ specs.append(spec)
for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
self.mgr.remove_health_warning(name)
self.mgr.apply_spec_fails = []
options_failed_to_set.append(msg)
if invalid_config_options:
- self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(invalid_config_options), invalid_config_options)
+ self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
+ invalid_config_options), invalid_config_options)
if options_failed_to_set:
- self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(options_failed_to_set), options_failed_to_set)
+ self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
+ options_failed_to_set), options_failed_to_set)
def _apply_service(self, spec: ServiceSpec) -> bool:
"""
return False
self.log.debug('Applying service %s spec' % service_name)
+ if service_type == 'agent':
+ try:
+ assert self.mgr.cherrypy_thread
+ assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ except Exception:
+ self.log.info(
+ 'Delaying applying agent spec until cephadm endpoint root cert created')
+ return False
+
self._apply_service_config(spec)
if service_type == 'osd':
rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
ha = HostAssignment(
spec=spec,
- hosts=self.mgr._schedulable_hosts(),
- unreachable_hosts=self.mgr._unreachable_hosts(),
+ hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
+ ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
daemons=daemons,
networks=self.mgr.cache.networks,
filter_new_host=(
self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
+ hosts_altered: Set[str] = set()
+
try:
# assign names
for i in range(len(slots_to_add)):
self._remove_daemon(d.name(), d.hostname)
daemons_to_remove.remove(d)
progress_done += 1
+ hosts_altered.add(d.hostname)
break
# deploy new daemon
try:
daemon_spec = svc.prepare_create(daemon_spec)
- self._create_daemon(daemon_spec)
+ self.mgr.wait_async(self._create_daemon(daemon_spec))
r = True
progress_done += 1
update_progress()
+ hosts_altered.add(daemon_spec.host)
except (RuntimeError, OrchestratorError) as e:
msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
f"on {slot.hostname}: {e}")
daemons.append(sd)
if daemon_place_fails:
- self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(daemon_place_fails), daemon_place_fails)
+ self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
+ daemon_place_fails), daemon_place_fails)
# remove any?
def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
progress_done += 1
update_progress()
+ hosts_altered.add(d.hostname)
self.mgr.remote('progress', 'complete', progress_id)
except Exception as e:
self.mgr.remote('progress', 'fail', progress_id, str(e))
raise
+ finally:
+ if self.mgr.use_agent:
+ # can only send ack to agents if we know for sure port they bound to
+ hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
+ h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
+ self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
if r is None:
r = False
return r
def _check_daemons(self) -> None:
-
+ self.log.debug('_check_daemons')
daemons = self.mgr.cache.get_daemons()
daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
for dd in daemons:
if dd.service_name() in self.mgr.spec_store.spec_deleted:
continue
+ if dd.daemon_type == 'agent':
+ try:
+ self.mgr.agent_helpers._check_agent(dd.hostname)
+ except Exception as e:
+ self.log.debug(
+ f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
+ continue
+
# These daemon types require additional configs after creation
if dd.daemon_type in REQUIRES_POST_ACTIONS:
daemons_post[dd.daemon_type].append(dd)
self.log.info('Reconfiguring %s (dependencies changed)...' % (
dd.name()))
action = 'reconfig'
+ elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
+ self.log.debug(
+ f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
+ self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
+ dd.extra_container_args = spec.extra_container_args
+ action = 'redeploy'
elif self.mgr.last_monmap and \
self.mgr.last_monmap > last_config and \
dd.daemon_type in CEPH_TYPES:
try:
daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
self.mgr._daemon_action(daemon_spec, action=action)
- self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
+ if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
+ self.mgr.cache.save_host(dd.hostname)
except OrchestratorError as e:
self.mgr.events.from_orch_error(e)
if dd.daemon_type in daemons_post:
daemon_type)).daemon_check_post(daemon_descs)
def _purge_deleted_services(self) -> None:
+ self.log.debug('_purge_deleted_services')
existing_services = self.mgr.spec_store.all_specs.items()
for service_name, spec in list(existing_services):
if service_name not in self.mgr.spec_store.spec_deleted:
digests: Dict[str, ContainerInspectInfo] = {}
for container_image_ref in set(settings.values()):
if not is_repo_digest(container_image_ref):
- image_info = self._get_container_image_info(container_image_ref)
+ image_info = self.mgr.wait_async(
+ self._get_container_image_info(container_image_ref))
if image_info.repo_digests:
# FIXME: we assume the first digest here is the best
assert is_repo_digest(image_info.repo_digests[0]), image_info
# FIXME: we assume the first digest here is the best
self.mgr.set_container_image(entity, image_info.repo_digests[0])
- def _create_daemon(self,
- daemon_spec: CephadmDaemonDeploySpec,
- reconfig: bool = False,
- osd_uuid_map: Optional[Dict[str, Any]] = None,
- ) -> str:
+ def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
+ # host -> path -> (mode, uid, gid, content, digest)
+ client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
+
+ # ceph.conf
+ config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
+ config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
+
+ if self.mgr.manage_etc_ceph_ceph_conf:
+ try:
+ pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
+ ha = HostAssignment(
+ spec=ServiceSpec('mon', placement=pspec),
+ hosts=self.mgr.cache.get_schedulable_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+ daemons=[],
+ networks=self.mgr.cache.networks,
+ )
+ all_slots, _, _ = ha.place()
+ for host in {s.hostname for s in all_slots}:
+ if host not in client_files:
+ client_files[host] = {}
+ client_files[host]['/etc/ceph/ceph.conf'] = (
+ 0o644, 0, 0, bytes(config), str(config_digest)
+ )
+ except Exception as e:
+ self.mgr.log.warning(
+ f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
+
+ # client keyrings
+ for ks in self.mgr.keys.keys.values():
+ try:
+ ret, keyring, err = self.mgr.mon_command({
+ 'prefix': 'auth get',
+ 'entity': ks.entity,
+ })
+ if ret:
+ self.log.warning(f'unable to fetch keyring for {ks.entity}')
+ continue
+ digest = ''.join('%02x' % c for c in hashlib.sha256(
+ keyring.encode('utf-8')).digest())
+ ha = HostAssignment(
+ spec=ServiceSpec('mon', placement=ks.placement),
+ hosts=self.mgr.cache.get_schedulable_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+ daemons=[],
+ networks=self.mgr.cache.networks,
+ )
+ all_slots, _, _ = ha.place()
+ for host in {s.hostname for s in all_slots}:
+ if host not in client_files:
+ client_files[host] = {}
+ client_files[host]['/etc/ceph/ceph.conf'] = (
+ 0o644, 0, 0, bytes(config), str(config_digest)
+ )
+ client_files[host][ks.path] = (
+ ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
+ )
+ except Exception as e:
+ self.log.warning(
+ f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
+ return client_files
+
+ def _write_client_files(self,
+ client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
+ host: str) -> None:
+ updated_files = False
+ old_files = self.mgr.cache.get_host_client_files(host).copy()
+ for path, m in client_files.get(host, {}).items():
+ mode, uid, gid, content, digest = m
+ if path in old_files:
+ match = old_files[path] == (digest, mode, uid, gid)
+ del old_files[path]
+ if match:
+ continue
+ self.log.info(f'Updating {host}:{path}')
+ self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
+ self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
+ updated_files = True
+ for path in old_files.keys():
+ self.log.info(f'Removing {host}:{path}')
+ cmd = ['rm', '-f', path]
+ self.mgr.ssh.check_execute_command(host, cmd)
+ updated_files = True
+ self.mgr.cache.removed_client_file(host, path)
+ if updated_files:
+ self.mgr.cache.save_host(host)
+
+ async def _create_daemon(self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ reconfig: bool = False,
+ osd_uuid_map: Optional[Dict[str, Any]] = None,
+ ) -> str:
with set_exception_subject('service', orchestrator.DaemonDescription(
daemon_type=daemon_spec.daemon_type,
if spec.ports:
ports.extend(spec.ports)
- if daemon_spec.daemon_type == 'cephadm-exporter':
- if not reconfig:
- assert daemon_spec.host
- self._deploy_cephadm_binary(daemon_spec.host)
-
# TCP port to open in the host firewall
if len(ports) > 0:
daemon_spec.extra_args.extend([
if self.mgr.allow_ptrace:
daemon_spec.extra_args.append('--allow-ptrace')
+ try:
+ eca = daemon_spec.extra_container_args
+ if eca:
+ for a in eca:
+ daemon_spec.extra_args.append(f'--extra-container-args={a}')
+ except AttributeError:
+ eca = None
+
if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
- self._registry_login(daemon_spec.host, self.mgr.registry_url,
- self.mgr.registry_username, self.mgr.registry_password)
+ await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
self.log.info('%s daemon %s on %s' % (
'Reconfiguring' if reconfig else 'Deploying',
daemon_spec.name(), daemon_spec.host))
- out, err, code = self._run_cephadm(
+ out, err, code = await self._run_cephadm(
daemon_spec.host, daemon_spec.name(), 'deploy',
[
'--name', daemon_spec.name(),
'deployed_by': self.mgr.get_active_mgr_digests(),
'rank': daemon_spec.rank,
'rank_generation': daemon_spec.rank_generation,
+ 'extra_container_args': eca
}),
'--config-json', '-',
] + daemon_spec.extra_args,
stdin=json.dumps(daemon_spec.final_config),
- image=image)
+ image=image,
+ )
+
+ if daemon_spec.daemon_type == 'agent':
+ self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
+ self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
# refresh daemon state? (ceph daemon reconfig does not need it)
if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
# prime cached service state with what we (should have)
# just created
sd = daemon_spec.to_daemon_description(
- DaemonDescriptionStatus.running, 'starting')
+ DaemonDescriptionStatus.starting, 'starting')
self.mgr.cache.add_daemon(daemon_spec.host, sd)
if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
self.mgr.requires_post_actions.add(daemon_spec.name())
self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
- self.mgr.cache.update_daemon_config_deps(
- daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
- self.mgr.cache.save_host(daemon_spec.host)
+ if daemon_spec.daemon_type != 'agent':
+ self.mgr.cache.update_daemon_config_deps(
+ daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
+ self.mgr.cache.save_host(daemon_spec.host)
+ else:
+ self.mgr.agent_cache.update_agent_config_deps(
+ daemon_spec.host, daemon_spec.deps, start_time)
+ self.mgr.agent_cache.save_agent(daemon_spec.host)
msg = "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
if not code:
self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
raise
- def _remove_daemon(self, name: str, host: str) -> str:
+ def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
"""
Remove a daemon
"""
# we can delete a mon instances data.
args = ['--name', name, '--force']
self.log.info('Removing daemon %s from %s' % (name, host))
- out, err, code = self._run_cephadm(
- host, name, 'rm-daemon', args)
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
+ host, name, 'rm-daemon', args))
if not code:
# remove item from cache
self.mgr.cache.rm_daemon(host, name)
self.mgr.cache.invalidate_host_daemons(host)
- self.mgr.cephadm_services[daemon_type_to_service(
- daemon_type)].post_remove(daemon, is_failed_deploy=False)
+ if not no_post_remove:
+ self.mgr.cephadm_services[daemon_type_to_service(
+ daemon_type)].post_remove(daemon, is_failed_deploy=False)
return "Removed {} from host '{}'".format(name, host)
- def _run_cephadm_json(self,
- host: str,
- entity: Union[CephadmNoImage, str],
- command: str,
- args: List[str],
- no_fsid: Optional[bool] = False,
- image: Optional[str] = "",
- ) -> Any:
+ async def _run_cephadm_json(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ no_fsid: Optional[bool] = False,
+ image: Optional[str] = "",
+ ) -> Any:
try:
- out, err, code = self._run_cephadm(
+ out, err, code = await self._run_cephadm(
host, entity, command, args, no_fsid=no_fsid, image=image)
if code:
raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
self.log.exception(f'{msg}: {"".join(out)}')
raise OrchestratorError(msg)
- def _run_cephadm(self,
- host: str,
- entity: Union[CephadmNoImage, str],
- command: str,
- args: List[str],
- addr: Optional[str] = "",
- stdin: Optional[str] = "",
- no_fsid: Optional[bool] = False,
- error_ok: Optional[bool] = False,
- image: Optional[str] = "",
- env_vars: Optional[List[str]] = None,
- ) -> Tuple[List[str], List[str], int]:
+ async def _run_cephadm(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ addr: Optional[str] = "",
+ stdin: Optional[str] = "",
+ no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
+ image: Optional[str] = "",
+ env_vars: Optional[List[str]] = None,
+ ) -> Tuple[List[str], List[str], int]:
"""
Run cephadm on the remote host with the given command + args
:env_vars: in format -> [KEY=VALUE, ..]
"""
+
+ await self.mgr.ssh._remote_connection(host, addr)
+
self.log.debug(f"_run_cephadm : command = {command}")
self.log.debug(f"_run_cephadm : args = {args}")
- bypass_image = ('cephadm-exporter',)
+ bypass_image = ('agent')
- with self._remote_connection(host, addr) as tpl:
- conn, connr = tpl
- assert image or entity
- # Skip the image check for daemons deployed that are not ceph containers
- if not str(entity).startswith(bypass_image):
- if not image and entity is not cephadmNoImage:
- image = self.mgr._get_container_image(entity)
+ assert image or entity
+ # Skip the image check for daemons deployed that are not ceph containers
+ if not str(entity).startswith(bypass_image):
+ if not image and entity is not cephadmNoImage:
+ image = self.mgr._get_container_image(entity)
- final_args = []
+ final_args = []
- # global args
- if env_vars:
- for env_var_pair in env_vars:
- final_args.extend(['--env', env_var_pair])
+ # global args
+ if env_vars:
+ for env_var_pair in env_vars:
+ final_args.extend(['--env', env_var_pair])
- if image:
- final_args.extend(['--image', image])
+ if image:
+ final_args.extend(['--image', image])
- if not self.mgr.container_init:
- final_args += ['--no-container-init']
+ if not self.mgr.container_init:
+ final_args += ['--no-container-init']
- # subcommand
- final_args.append(command)
+ # subcommand
+ final_args.append(command)
- # subcommand args
- if not no_fsid:
- final_args += ['--fsid', self.mgr._cluster_fsid]
+ # subcommand args
+ if not no_fsid:
+ final_args += ['--fsid', self.mgr._cluster_fsid]
- final_args += args
+ final_args += args
- # exec
- self.log.debug('args: %s' % (' '.join(final_args)))
- if self.mgr.mode == 'root':
- if stdin:
- self.log.debug('stdin: %s' % stdin)
+ # exec
+ self.log.debug('args: %s' % (' '.join(final_args)))
+ if self.mgr.mode == 'root':
+ # agent has cephadm binary as an extra file which is
+ # therefore passed over stdin. Even for debug logs it's too much
+ if stdin and 'agent' not in str(entity):
+ self.log.debug('stdin: %s' % stdin)
- python = connr.choose_python()
- if not python:
- raise RuntimeError(
- 'unable to find python on %s (tried %s in %s)' % (
- host, remotes.PYTHONS, remotes.PATH))
- try:
- out, err, code = remoto.process.check(
- conn,
- [python, self.mgr.cephadm_binary_path] + final_args,
- stdin=stdin.encode('utf-8') if stdin is not None else None)
- if code == 2:
- out_ls, err_ls, code_ls = remoto.process.check(
- conn, ['ls', self.mgr.cephadm_binary_path])
- if code_ls == 2:
- self._deploy_cephadm_binary_conn(conn, host)
- out, err, code = remoto.process.check(
- conn,
- [python, self.mgr.cephadm_binary_path] + final_args,
- stdin=stdin.encode('utf-8') if stdin is not None else None)
-
- except RuntimeError as e:
- self.mgr._reset_con(host)
- if error_ok:
- return [], [str(e)], 1
- raise
+ cmd = ['which', 'python3']
+ python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
+ cmd = [python, self.mgr.cephadm_binary_path] + final_args
- elif self.mgr.mode == 'cephadm-package':
- try:
- out, err, code = remoto.process.check(
- conn,
- ['sudo', '/usr/bin/cephadm'] + final_args,
- stdin=stdin)
- except RuntimeError as e:
- self.mgr._reset_con(host)
- if error_ok:
- return [], [str(e)], 1
- raise
- else:
- assert False, 'unsupported mode'
-
- self.log.debug('code: %d' % code)
- if out:
- self.log.debug('out: %s' % '\n'.join(out))
- if err:
- self.log.debug('err: %s' % '\n'.join(err))
- if code and not error_ok:
- raise OrchestratorError(
- 'cephadm exited with an error code: %d, stderr:%s' % (
- code, '\n'.join(err)))
- return out, err, code
-
- def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
+ try:
+ out, err, code = await self.mgr.ssh._execute_command(
+ host, cmd, stdin=stdin, addr=addr)
+ if code == 2:
+ ls_cmd = ['ls', self.mgr.cephadm_binary_path]
+ out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
+ if code_ls == 2:
+ await self._deploy_cephadm_binary(host, addr)
+ out, err, code = await self.mgr.ssh._execute_command(
+ host, cmd, stdin=stdin, addr=addr)
+ # if there is an agent on this host, make sure it is using the most recent
+ # vesion of cephadm binary
+ if host in self.mgr.inventory:
+ for agent in self.mgr.cache.get_daemons_by_type('agent', host):
+ self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+
+ except Exception as e:
+ await self.mgr.ssh._reset_con(host)
+ if error_ok:
+ return [], [str(e)], 1
+ raise
+
+ elif self.mgr.mode == 'cephadm-package':
+ try:
+ cmd = ['/usr/bin/cephadm'] + final_args
+ out, err, code = await self.mgr.ssh._execute_command(
+ host, cmd, stdin=stdin, addr=addr)
+ except Exception as e:
+ await self.mgr.ssh._reset_con(host)
+ if error_ok:
+ return [], [str(e)], 1
+ raise
+ else:
+ assert False, 'unsupported mode'
+
+ self.log.debug(f'code: {code}')
+ if out:
+ self.log.debug(f'out: {out}')
+ if err:
+ self.log.debug(f'err: {err}')
+ if code and not error_ok:
+ raise OrchestratorError(
+ f'cephadm exited with an error code: {code}, stderr: {err}')
+ return [out], [err], code
+
+ async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
# pick a random host...
host = None
for host_name in self.mgr.inventory.keys():
if not host:
raise OrchestratorError('no hosts defined')
if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
- self._registry_login(host, self.mgr.registry_url,
- self.mgr.registry_username, self.mgr.registry_password)
+ await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
pullargs: List[str] = []
if self.mgr.registry_insecure:
pullargs.append("--insecure")
- j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
+ j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
r = ContainerInspectInfo(
j['image_id'],
return r
# function responsible for logging single host into custom registry
- def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
- self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
+ async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
+ self.log.debug(
+ f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
# want to pass info over stdin rather than through normal list of args
- args_str = json.dumps({
- 'url': url,
- 'username': username,
- 'password': password,
- })
- out, err, code = self._run_cephadm(
+ out, err, code = await self._run_cephadm(
host, 'mon', 'registry-login',
- ['--registry-json', '-'], stdin=args_str, error_ok=True)
+ ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
if code:
- return f"Host {host} failed to login to {url} as {username} with given password"
+ return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
return None
- def _deploy_cephadm_binary(self, host: str) -> None:
+ async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
# Use tee (from coreutils) to create a copy of cephadm on the target machine
self.log.info(f"Deploying cephadm binary to {host}")
- with self._remote_connection(host) as tpl:
- conn, _connr = tpl
- return self._deploy_cephadm_binary_conn(conn, host)
-
- def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None:
- _out, _err, code = remoto.process.check(
- conn,
- ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}'])
- if code:
- msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
- self.log.warning(msg)
- raise OrchestratorError(msg)
- _out, _err, code = remoto.process.check(
- conn,
- ['tee', '-', self.mgr.cephadm_binary_path],
- stdin=self.mgr._cephadm.encode('utf-8'))
- if code:
- msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
- self.log.warning(msg)
- raise OrchestratorError(msg)
-
- def _write_remote_file(self,
- host: str,
- path: str,
- content: bytes,
- mode: int,
- uid: int,
- gid: int) -> None:
- with self._remote_connection(host) as tpl:
- conn, connr = tpl
- try:
- errmsg = connr.write_file(path, content, mode, uid, gid)
- if errmsg is not None:
- raise OrchestratorError(errmsg)
- except Exception as e:
- msg = f"Unable to write {host}:{path}: {e}"
- self.log.warning(msg)
- raise OrchestratorError(msg)
-
- @contextmanager
- def _remote_connection(self,
- host: str,
- addr: Optional[str] = None,
- ) -> Iterator[Tuple["BaseConnection", Any]]:
- if not addr and host in self.mgr.inventory:
- addr = self.mgr.inventory.get_addr(host)
-
- self.mgr.offline_hosts_remove(host)
-
- try:
- try:
- if not addr:
- raise OrchestratorError("host address is empty")
- conn, connr = self.mgr._get_connection(addr)
- except OSError as e:
- self.mgr._reset_con(host)
- msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
- raise execnet.gateway_bootstrap.HostNotFound(msg)
-
- yield (conn, connr)
-
- except execnet.gateway_bootstrap.HostNotFound as e:
- # this is a misleading exception as it seems to be thrown for
- # any sort of connection failure, even those having nothing to
- # do with "host not found" (e.g., ssh key permission denied).
- self.mgr.offline_hosts.add(host)
- self.mgr._reset_con(host)
-
- user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm'
- if str(e).startswith("Can't communicate"):
- msg = str(e)
- else:
- msg = f'''Failed to connect to {host} ({addr}).
-Please make sure that the host is reachable and accepts connections using the cephadm SSH key
-
-To add the cephadm SSH key to the host:
-> ceph cephadm get-pub-key > ~/ceph.pub
-> ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
-
-To check that the host is reachable open a new shell with the --no-hosts flag:
-> cephadm shell --no-hosts
-
-Then run the following:
-> ceph cephadm get-ssh-config > ssh_config
-> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
-> chmod 0600 ~/cephadm_private_key
-> ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
- raise OrchestratorError(msg) from e
- except Exception as ex:
- self.log.exception(ex)
- raise
+ await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
+ self.mgr._cephadm.encode('utf-8'), addr=addr)