]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
9 from ceph
.deployment
import inventory
10 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
11 from ceph
.utils
import datetime_now
13 from typing
import List
, Dict
, Optional
, Callable
, Any
, TypeVar
, Tuple
, TYPE_CHECKING
16 from ceph
.deployment
.drive_group
import DriveGroupSpec
18 pass # just for type checking
21 from kubernetes
import client
, config
22 from kubernetes
.client
.rest
import ApiException
24 kubernetes_imported
= True
26 # https://github.com/kubernetes-client/python/issues/895
27 from kubernetes
.client
.models
.v1_container_image
import V1ContainerImage
28 def names(self
: Any
, names
: Any
) -> None:
30 V1ContainerImage
.names
= V1ContainerImage
.names
.setter(names
)
33 kubernetes_imported
= False
37 from mgr_module
import MgrModule
, Option
, NFS_POOL_NAME
39 from orchestrator
import handle_orch_error
, OrchResult
, raise_if_exception
41 from .rook_cluster
import RookCluster
44 FuncT
= TypeVar('FuncT', bound
=Callable
)
45 ServiceSpecT
= TypeVar('ServiceSpecT', bound
=ServiceSpec
)
48 class RookEnv(object):
49 def __init__(self
) -> None:
50 # POD_NAMESPACE already exist for Rook 0.9
51 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
53 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
54 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
56 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', self
.namespace
)
57 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
58 self
.api_name
= "ceph.rook.io/" + self
.crd_version
60 def api_version_match(self
) -> bool:
61 return self
.crd_version
== 'v1'
63 def has_namespace(self
) -> bool:
64 return 'POD_NAMESPACE' in os
.environ
67 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
69 Writes are a two-phase thing, firstly sending
70 the write to the k8s API (fast) and then waiting
71 for the corresponding change to appear in the
74 Right now, we are calling the k8s API synchronously.
77 MODULE_OPTIONS
: List
[Option
] = [
78 # TODO: configure k8s API addr instead of assuming local
83 desc
='storage class name for LSO-discovered PVs',
86 'drive_group_interval',
89 desc
='interval in seconds between re-application of applied drive_groups',
94 def can_run() -> Tuple
[bool, str]:
95 if not kubernetes_imported
:
96 return False, "`kubernetes` python module not found"
97 if not RookEnv().api_version_match():
98 return False, "Rook version unsupported."
101 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
102 if not kubernetes_imported
:
103 return False, "`kubernetes` python module not found", {}
104 elif not self
._rook
_env
.has_namespace():
105 return False, "ceph-mgr not running in Rook cluster", {}
108 self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
)
109 except ApiException
as e
:
110 return False, "Cannot reach Kubernetes API: {}".format(e
), {}
114 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
115 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
117 self
._initialized
= threading
.Event()
118 self
._k
8s
_CoreV
1_api
: Optional
[client
.CoreV1Api
] = None
119 self
._k
8s
_BatchV
1_api
: Optional
[client
.BatchV1Api
] = None
120 self
._k
8s
_CustomObjects
_api
: Optional
[client
.CustomObjectsApi
] = None
121 self
._k
8s
_StorageV
1_api
: Optional
[client
.StorageV1Api
] = None
122 self
._rook
_cluster
: Optional
[RookCluster
] = None
123 self
._rook
_env
= RookEnv()
124 self
._k
8s
_AppsV
1_api
: Optional
[client
.AppsV1Api
] = None
128 self
.storage_class
= 'foo'
129 self
.drive_group_interval
= 10.0
131 self
._load
_drive
_groups
()
132 self
._shutdown
= threading
.Event()
134 def config_notify(self
) -> None:
136 This method is called whenever one of our config options is changed.
138 TODO: this method should be moved into mgr_module.py
140 for opt
in self
.MODULE_OPTIONS
:
142 opt
['name'], # type: ignore
143 self
.get_module_option(opt
['name'])) # type: ignore
144 self
.log
.debug(' mgr option %s = %s',
145 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
146 assert isinstance(self
.storage_class
, str)
147 assert isinstance(self
.drive_group_interval
, float)
149 if self
._rook
_cluster
:
150 self
._rook
_cluster
.storage_class
= self
.storage_class
152 def shutdown(self
) -> None:
157 # type: () -> client.CoreV1Api
158 self
._initialized
.wait()
159 assert self
._k
8s
_CoreV
1_api
is not None
160 return self
._k
8s
_CoreV
1_api
163 def rook_cluster(self
):
164 # type: () -> RookCluster
165 self
._initialized
.wait()
166 assert self
._rook
_cluster
is not None
167 return self
._rook
_cluster
169 def serve(self
) -> None:
170 # For deployed clusters, we should always be running inside
171 # a Rook cluster. For development convenience, also support
172 # running outside (reading ~/.kube config)
174 if self
._rook
_env
.has_namespace():
175 config
.load_incluster_config()
177 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
178 config
.load_kube_config()
180 # So that I can do port forwarding from my workstation - jcsp
181 from kubernetes
.client
import configuration
182 configuration
.verify_ssl
= False
184 self
._k
8s
_CoreV
1_api
= client
.CoreV1Api()
185 self
._k
8s
_BatchV
1_api
= client
.BatchV1Api()
186 self
._k
8s
_CustomObjects
_api
= client
.CustomObjectsApi()
187 self
._k
8s
_StorageV
1_api
= client
.StorageV1Api()
188 self
._k
8s
_AppsV
1_api
= client
.AppsV1Api()
191 # XXX mystery hack -- I need to do an API call from
192 # this context, or subsequent API usage from handle_command
193 # fails with SSLError('bad handshake'). Suspect some kind of
194 # thread context setup in SSL lib?
195 self
._k
8s
_CoreV
1_api
.list_namespaced_pod(self
._rook
_env
.namespace
)
197 # Ignore here to make self.available() fail with a proper error message
200 assert isinstance(self
.storage_class
, str)
202 self
._rook
_cluster
= RookCluster(
203 self
._k
8s
_CoreV
1_api
,
204 self
._k
8s
_BatchV
1_api
,
205 self
._k
8s
_CustomObjects
_api
,
206 self
._k
8s
_StorageV
1_api
,
207 self
._k
8s
_AppsV
1_api
,
211 self
._initialized
.set()
214 while not self
._shutdown
.is_set():
215 self
._apply
_drivegroups
(list(self
._drive
_group
_map
.values()))
216 self
._shutdown
.wait(self
.drive_group_interval
)
219 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
221 if host_filter
and host_filter
.hosts
:
223 host_list
= host_filter
.hosts
224 elif host_filter
and host_filter
.labels
:
225 # TODO: query k8s API to resolve to host list, and pass
226 # it into RookCluster.get_discovered_devices
227 raise NotImplementedError()
229 discovered_devs
= self
.rook_cluster
.get_discovered_devices(host_list
)
232 for host_name
, host_devs
in discovered_devs
.items():
237 result
.append(orchestrator
.InventoryHost(host_name
, inventory
.Devices(devs
)))
243 # type: () -> List[orchestrator.HostSpec]
244 return self
.rook_cluster
.get_hosts()
247 def describe_service(self
,
248 service_type
: Optional
[str] = None,
249 service_name
: Optional
[str] = None,
250 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
254 cl
= self
.rook_cluster
.rook_api_get(
255 "cephclusters/{0}".format(self
.rook_cluster
.rook_env
.cluster_name
))
256 self
.log
.debug('CephCluster %s' % cl
)
257 image_name
= cl
['spec'].get('cephVersion', {}).get('image', None)
258 num_nodes
= len(self
.rook_cluster
.get_node_names())
261 if service_type
== 'mon' or service_type
is None:
262 spec
['mon'] = orchestrator
.ServiceDescription(
265 placement
=PlacementSpec(
266 count
=cl
['spec'].get('mon', {}).get('count', 1),
269 size
=cl
['spec'].get('mon', {}).get('count', 1),
270 container_image_name
=image_name
,
273 if service_type
== 'mgr' or service_type
is None:
274 spec
['mgr'] = orchestrator
.ServiceDescription(
277 placement
=PlacementSpec
.from_string('count:1'),
280 container_image_name
=image_name
,
285 service_type
== 'crash' or service_type
is None
286 and not cl
['spec'].get('crashCollector', {}).get('disable', False)
288 spec
['crash'] = orchestrator
.ServiceDescription(
291 placement
=PlacementSpec
.from_string('*'),
294 container_image_name
=image_name
,
298 if service_type
== 'mds' or service_type
is None:
300 all_fs
= self
.rook_cluster
.get_resource("cephfilesystems")
302 svc
= 'mds.' + fs
['metadata']['name']
305 # FIXME: we are conflating active (+ standby) with count
306 active
= fs
['spec'].get('metadataServer', {}).get('activeCount', 1)
308 if fs
['spec'].get('metadataServer', {}).get('activeStandby', False):
309 total_mds
= active
* 2
310 spec
[svc
] = orchestrator
.ServiceDescription(
313 service_id
=fs
['metadata']['name'],
314 placement
=PlacementSpec(count
=active
),
317 container_image_name
=image_name
,
321 if service_type
== 'rgw' or service_type
is None:
323 all_zones
= self
.rook_cluster
.get_resource("cephobjectstores")
324 for zone
in all_zones
:
325 svc
= 'rgw.' + zone
['metadata']['name']
328 active
= zone
['spec']['gateway']['instances'];
329 if 'securePort' in zone
['spec']['gateway']:
331 port
= zone
['spec']['gateway']['securePort']
334 port
= zone
['spec']['gateway']['port'] or 80
335 rgw_zone
= zone
['spec'].get('zone', {}).get('name') or None
336 spec
[svc
] = orchestrator
.ServiceDescription(
338 service_id
=zone
['metadata']['name'],
341 rgw_frontend_port
=port
,
342 placement
=PlacementSpec(count
=active
),
345 container_image_name
=image_name
,
349 if service_type
== 'nfs' or service_type
is None:
351 all_nfs
= self
.rook_cluster
.get_resource("cephnfses")
352 nfs_pods
= self
.rook_cluster
.describe_pods('nfs', None, None)
354 if nfs
['spec']['rados']['pool'] != NFS_POOL_NAME
:
356 nfs_name
= nfs
['metadata']['name']
357 svc
= 'nfs.' + nfs_name
360 active
= nfs
['spec'].get('server', {}).get('active')
361 creation_timestamp
= datetime
.datetime
.strptime(nfs
['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ')
362 spec
[svc
] = orchestrator
.ServiceDescription(
365 placement
=PlacementSpec(count
=active
),
369 running
=len([1 for pod
in nfs_pods
if pod
['labels']['ceph_nfs'] == nfs_name
]),
370 created
=creation_timestamp
.astimezone(tz
=datetime
.timezone
.utc
)
372 if service_type
== 'osd' or service_type
is None:
374 # FIXME: map running OSDs back to their respective services...
376 # the catch-all unmanaged
377 all_osds
= self
.rook_cluster
.get_osds()
379 spec
[svc
] = orchestrator
.ServiceDescription(
386 running
=sum(osd
.status
.phase
== 'Running' for osd
in all_osds
)
390 for name
, dg
in self
._drive
_group
_map
.items():
391 spec
[f
'osd.{name}'] = orchestrator
.ServiceDescription(
398 if service_type
== 'rbd-mirror' or service_type
is None:
400 all_mirrors
= self
.rook_cluster
.get_resource("cephrbdmirrors")
401 for mirror
in all_mirrors
:
403 mirror_name
= mirror
['metadata']['name']
404 svc
= 'rbd-mirror.' + mirror_name
407 spec
[svc
] = orchestrator
.ServiceDescription(
409 service_id
=mirror_name
,
410 service_type
="rbd-mirror",
411 placement
=PlacementSpec(count
=1),
417 for dd
in self
._list
_daemons
():
418 if dd
.service_name() not in spec
:
420 service
= spec
[dd
.service_name()]
422 if not service
.container_image_id
:
423 service
.container_image_id
= dd
.container_image_id
424 if not service
.container_image_name
:
425 service
.container_image_name
= dd
.container_image_name
426 if service
.last_refresh
is None or not dd
.last_refresh
or dd
.last_refresh
< service
.last_refresh
:
427 service
.last_refresh
= dd
.last_refresh
428 if service
.created
is None or dd
.created
is None or dd
.created
< service
.created
:
429 service
.created
= dd
.created
431 return [v
for k
, v
in spec
.items()]
434 def list_daemons(self
,
435 service_name
: Optional
[str] = None,
436 daemon_type
: Optional
[str] = None,
437 daemon_id
: Optional
[str] = None,
438 host
: Optional
[str] = None,
439 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
440 return self
._list
_daemons
(service_name
=service_name
,
441 daemon_type
=daemon_type
,
446 def _list_daemons(self
,
447 service_name
: Optional
[str] = None,
448 daemon_type
: Optional
[str] = None,
449 daemon_id
: Optional
[str] = None,
450 host
: Optional
[str] = None,
451 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
452 pods
= self
.rook_cluster
.describe_pods(daemon_type
, daemon_id
, host
)
453 self
.log
.debug('pods %s' % pods
)
456 sd
= orchestrator
.DaemonDescription()
457 sd
.hostname
= p
['hostname']
458 sd
.daemon_type
= p
['labels']['app'].replace('rook-ceph-', '')
460 'Pending': orchestrator
.DaemonDescriptionStatus
.starting
,
461 'Running': orchestrator
.DaemonDescriptionStatus
.running
,
462 'Succeeded': orchestrator
.DaemonDescriptionStatus
.stopped
,
463 'Failed': orchestrator
.DaemonDescriptionStatus
.error
,
464 'Unknown': orchestrator
.DaemonDescriptionStatus
.unknown
,
468 if 'ceph_daemon_id' in p
['labels']:
469 sd
.daemon_id
= p
['labels']['ceph_daemon_id']
470 elif 'ceph-osd-id' in p
['labels']:
471 sd
.daemon_id
= p
['labels']['ceph-osd-id']
473 # Unknown type -- skip it
476 if service_name
is not None and service_name
!= sd
.service_name():
478 sd
.container_image_name
= p
['container_image_name']
479 sd
.container_image_id
= p
['container_image_id']
480 sd
.created
= p
['created']
481 sd
.last_configured
= p
['created']
482 sd
.last_deployed
= p
['created']
483 sd
.started
= p
['started']
484 sd
.last_refresh
= p
['refreshed']
489 def _get_pool_params(self
) -> Tuple
[int, str]:
490 num_replicas
= self
.get_ceph_option('osd_pool_default_size')
491 assert type(num_replicas
) is int
493 leaf_type_id
= self
.get_ceph_option('osd_crush_chooseleaf_type')
494 assert type(leaf_type_id
) is int
495 crush
= self
.get('osd_map_crush')
497 for t
in crush
['types']:
498 if t
['type_id'] == leaf_type_id
:
499 leaf_type
= t
['name']
501 return num_replicas
, leaf_type
504 def remove_service(self
, service_name
: str, force
: bool = False) -> str:
505 if service_name
== 'rbd-mirror':
506 return self
.rook_cluster
.rm_service('cephrbdmirrors', 'default-rbd-mirror')
507 service_type
, service_id
= service_name
.split('.', 1)
508 if service_type
== 'mds':
509 return self
.rook_cluster
.rm_service('cephfilesystems', service_id
)
510 elif service_type
== 'rgw':
511 return self
.rook_cluster
.rm_service('cephobjectstores', service_id
)
512 elif service_type
== 'nfs':
513 ret
, out
, err
= self
.mon_command({
516 matches
= re
.findall(rf
'client\.nfs-ganesha\.{service_id}\..*', out
)
517 for match
in matches
:
518 self
.check_mon_command({
522 return self
.rook_cluster
.rm_service('cephnfses', service_id
)
523 elif service_type
== 'rbd-mirror':
524 return self
.rook_cluster
.rm_service('cephrbdmirrors', service_id
)
525 elif service_type
== 'osd':
526 if service_id
in self
._drive
_group
_map
:
527 del self
._drive
_group
_map
[service_id
]
528 self
._save
_drive
_groups
()
529 return f
'Removed {service_name}'
530 elif service_type
== 'ingress':
531 self
.log
.info("{0} service '{1}' does not exist".format('ingress', service_id
))
532 return 'The Rook orchestrator does not currently support ingress'
534 raise orchestrator
.OrchestratorError(f
'Service type {service_type} not supported')
536 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
538 self
.rook_cluster
.create_zap_job(host
, path
)
539 except Exception as e
:
541 return OrchResult(None, Exception("Unable to zap device: " + str(e
.with_traceback(None))))
542 return OrchResult(f
'{path} on {host} zapped')
545 def apply_mon(self
, spec
):
546 # type: (ServiceSpec) -> str
547 if spec
.placement
.hosts
or spec
.placement
.label
:
548 raise RuntimeError("Host list or label is not supported by rook.")
550 return self
.rook_cluster
.update_mon_count(spec
.placement
.count
)
552 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
554 self
.rook_cluster
.rbd_mirror(spec
)
555 return OrchResult("Success")
556 except Exception as e
:
557 return OrchResult(None, e
)
560 def apply_mds(self
, spec
):
561 # type: (ServiceSpec) -> str
562 num_replicas
, leaf_type
= self
._get
_pool
_params
()
563 return self
.rook_cluster
.apply_filesystem(spec
, num_replicas
, leaf_type
)
566 def apply_rgw(self
, spec
):
567 # type: (RGWSpec) -> str
568 num_replicas
, leaf_type
= self
._get
_pool
_params
()
569 return self
.rook_cluster
.apply_objectstore(spec
, num_replicas
, leaf_type
)
572 def apply_nfs(self
, spec
):
573 # type: (NFSServiceSpec) -> str
575 return self
.rook_cluster
.apply_nfsgw(spec
, self
)
576 except Exception as e
:
578 return "Unable to create NFS daemon, check logs for more traceback\n" + str(e
.with_traceback(None))
581 def remove_daemons(self
, names
: List
[str]) -> List
[str]:
582 return self
.rook_cluster
.remove_pods(names
)
584 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
585 for drive_group
in specs
:
586 self
._drive
_group
_map
[str(drive_group
.service_id
)] = drive_group
587 self
._save
_drive
_groups
()
588 return OrchResult(self
._apply
_drivegroups
(specs
))
590 def _apply_drivegroups(self
, ls
: List
[DriveGroupSpec
]) -> List
[str]:
591 all_hosts
= raise_if_exception(self
.get_hosts())
592 result_list
: List
[str] = []
593 for drive_group
in ls
:
594 matching_hosts
= drive_group
.placement
.filter_matching_hosts(
595 lambda label
=None, as_hostspec
=None: all_hosts
598 if not self
.rook_cluster
.node_exists(matching_hosts
[0]):
599 raise RuntimeError("Node '{0}' is not in the Kubernetes "
600 "cluster".format(matching_hosts
))
602 # Validate whether cluster CRD can accept individual OSD
603 # creations (i.e. not useAllDevices)
604 if not self
.rook_cluster
.can_create_osd():
605 raise RuntimeError("Rook cluster configuration does not "
606 "support OSD creation.")
607 result_list
.append(self
.rook_cluster
.add_osds(drive_group
, matching_hosts
))
610 def _load_drive_groups(self
) -> None:
611 stored_drive_group
= self
.get_store("drive_group_map")
612 self
._drive
_group
_map
: Dict
[str, DriveGroupSpec
] = {}
613 if stored_drive_group
:
614 for name
, dg
in json
.loads(stored_drive_group
).items():
616 self
._drive
_group
_map
[name
] = DriveGroupSpec
.from_json(dg
)
617 except ValueError as e
:
618 self
.log
.error(f
'Failed to load drive group {name} ({dg}): {e}')
620 def _save_drive_groups(self
) -> None:
621 json_drive_group_map
= {
622 name
: dg
.to_json() for name
, dg
in self
._drive
_group
_map
.items()
624 self
.set_store("drive_group_map", json
.dumps(json_drive_group_map
))
626 def remove_osds(self
, osd_ids
: List
[str], replace
: bool = False, force
: bool = False, zap
: bool = False) -> OrchResult
[str]:
627 assert self
._rook
_cluster
is not None
629 raise RuntimeError("Rook does not support zapping devices during OSD removal.")
630 res
= self
._rook
_cluster
.remove_osds(osd_ids
, replace
, force
, self
.mon_command
)
631 return OrchResult(res
)
633 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
634 return self
.rook_cluster
.add_host_label(host
, label
)
636 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
637 return self
.rook_cluster
.remove_host_label(host
, label
)
640 def create_osds(self, drive_group):
641 # type: (DriveGroupSpec) -> str
642 # Creates OSDs from a drive group specification.
644 # $: ceph orch osd create -i <dg.file>
646 # The drivegroup file must only contain one spec at a time.
649 targets = [] # type: List[str]
650 if drive_group.data_devices and drive_group.data_devices.paths:
651 targets += [d.path for d in drive_group.data_devices.paths]
652 if drive_group.data_directories:
653 targets += drive_group.data_directories
655 all_hosts = raise_if_exception(self.get_hosts())
657 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
659 assert len(matching_hosts) == 1
661 if not self.rook_cluster.node_exists(matching_hosts[0]):
662 raise RuntimeError("Node '{0}' is not in the Kubernetes "
663 "cluster".format(matching_hosts))
665 # Validate whether cluster CRD can accept individual OSD
666 # creations (i.e. not useAllDevices)
667 if not self.rook_cluster.can_create_osd():
668 raise RuntimeError("Rook cluster configuration does not "
669 "support OSD creation.")
671 return self.rook_cluster.add_osds(drive_group, matching_hosts)
673 # TODO: this was the code to update the progress reference:
676 def has_osds(matching_hosts: List[str]) -> bool:
678 # Find OSD pods on this host
680 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
681 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
682 field_selector="spec.nodeName={0}".format(
686 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
688 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
691 osdmap = self.get("osd_map")
692 for osd in osdmap['osds']:
694 if osd_id not in pod_osd_ids:
697 metadata = self.get_metadata('osd', "%s" % osd_id)
698 if metadata and metadata['devices'] in targets:
701 self.log.info("ignoring osd {0} {1}".format(
702 osd_id, metadata['devices'] if metadata else 'DNE'
705 return found is not None
709 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
710 return self
.rook_cluster
.blink_light(ident_fault
, on
, locs
)