#!/usr/bin/env python
#
-# Copyright (C) 2015, 2016 Red Hat <contact@redhat.com>
+# Copyright (C) 2015, 2016, 2017 Red Hat <contact@redhat.com>
# Copyright (C) 2014 Inktank <info@inktank.com>
# Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
# Copyright (C) 2014 Catalyst.net Ltd
import uuid
import time
import shlex
+import shutil
import pwd
import grp
import textwrap
PROCDIR = '/compat/linux/proc'
# FreeBSD does not have blockdevices any more
BLOCKDIR = '/dev'
+ ROOTGROUP = 'wheel'
else:
FREEBSD = False
DEFAULT_FS_TYPE = 'xfs'
PROCDIR = '/proc'
BLOCKDIR = '/sys/block'
+ ROOTGROUP = 'root'
"""
OSD STATUS Definition
return _bytes2str(out), _bytes2str(err), process.returncode
+def command_with_stdin(arguments, stdin):
+ LOG.info("Running command with stdin: " + " ".join(arguments))
+ process = subprocess.Popen(
+ arguments,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = process.communicate(stdin)
+ LOG.debug(out)
+ if process.returncode != 0:
+ LOG.error(err)
+ raise SystemExit(
+ "'{cmd}' failed with status code {returncode}".format(
+ cmd=arguments,
+ returncode=process.returncode,
+ )
+ )
+ return out
+
+
def _bytes2str(string):
return string.decode('utf-8') if isinstance(string, bytes) else string
cluster,
fsid,
keyring,
+ path,
):
"""
- Accocates an OSD id on the given cluster.
+ Allocates an OSD id on the given cluster.
:raises: Error if the call to allocate the OSD id fails.
:return: The allocated OSD id.
"""
+ lockbox_path = os.path.join(STATEDIR, 'osd-lockbox', fsid)
+ lockbox_osd_id = read_one_line(lockbox_path, 'whoami')
+ osd_keyring = os.path.join(path, 'keyring')
+ if lockbox_osd_id:
+ LOG.debug('Getting OSD id from Lockbox...')
+ osd_id = lockbox_osd_id
+ shutil.move(os.path.join(lockbox_path, 'osd_keyring'),
+ osd_keyring)
+ path_set_context(osd_keyring)
+ os.unlink(os.path.join(lockbox_path, 'whoami'))
+ return osd_id
LOG.debug('Allocating OSD id...')
+ secrets = Secrets()
try:
- osd_id = _check_output(
- args=[
+ wanttobe = read_one_line(path, 'wanttobe')
+ if os.path.exists(os.path.join(path, 'wanttobe')):
+ os.unlink(os.path.join(path, 'wanttobe'))
+ id_arg = wanttobe and [wanttobe] or []
+ osd_id = command_with_stdin(
+ [
'ceph',
'--cluster', cluster,
'--name', 'client.bootstrap-osd',
'--keyring', keyring,
- 'osd', 'create', '--concise',
+ '-i', '-',
+ 'osd', 'new',
fsid,
- ],
+ ] + id_arg,
+ secrets.get_json()
)
except subprocess.CalledProcessError as e:
raise Error('ceph osd create failed', e, e.output)
osd_id = must_be_one_line(osd_id)
check_osd_id(osd_id)
+ secrets.write_osd_keyring(osd_keyring, osd_id)
return osd_id
rawdev,
] + cryptsetup_parameters
- def run(args, stdin):
- LOG.info(" ".join(args))
- process = subprocess.Popen(
- args,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = process.communicate(stdin)
- LOG.debug(out)
- LOG.error(err)
- assert process.returncode == 0
-
try:
if luks:
if format_dev:
- run(luksFormat_args, key)
- run(luksOpen_args, key)
+ command_with_stdin(luksFormat_args, key)
+ command_with_stdin(luksOpen_args, key)
else:
# Plain mode has no format function, nor any validation
# that the key is correct.
- run(create_args, key)
+ command_with_stdin(create_args, key)
# set proper ownership of mapped device
command_check_call(['chown', 'ceph:ceph', dev])
return dev
metavar='UUID',
help='unique OSD uuid to assign this disk to',
)
+ parser.add_argument(
+ '--osd-id',
+ metavar='ID',
+ help='unique OSD id to assign this disk to',
+ )
parser.add_argument(
'--crush-device-class',
help='crush device class to assign this disk to',
return None
+class Secrets(object):
+
+ def __init__(self):
+ secret, stderr, ret = command(['ceph-authtool', '--gen-print-key'])
+ LOG.debug("stderr " + stderr)
+ assert ret == 0
+ self.keys = {
+ 'cephx_secret': secret.strip(),
+ }
+
+ def write_osd_keyring(self, keyring, osd_id):
+ command_check_call(
+ [
+ 'ceph-authtool', keyring,
+ '--create-keyring',
+ '--name', 'osd.' + str(osd_id),
+ '--add-key', self.keys['cephx_secret'],
+ ])
+ path_set_context(keyring)
+
+ def get_json(self):
+ return bytearray(json.dumps(self.keys), 'ascii')
+
+
+class LockboxSecrets(Secrets):
+
+ def __init__(self, args):
+ super(LockboxSecrets, self).__init__()
+
+ key_size = CryptHelpers.get_dmcrypt_keysize(args)
+ key = open('/dev/urandom', 'rb').read(key_size / 8)
+ base64_key = base64.b64encode(key).decode('ascii')
+
+ secret, stderr, ret = command(['ceph-authtool', '--gen-print-key'])
+ LOG.debug("stderr " + stderr)
+ assert ret == 0
+
+ self.keys.update({
+ 'dmcrypt_key': base64_key,
+ 'cephx_lockbox_secret': secret.strip(),
+ })
+
+ def write_lockbox_keyring(self, path, osd_uuid):
+ keyring = os.path.join(path, 'keyring')
+ command_check_call(
+ [
+ 'ceph-authtool', keyring,
+ '--create-keyring',
+ '--name', 'client.osd-lockbox.' + osd_uuid,
+ '--add-key', self.keys['cephx_lockbox_secret'],
+ ])
+ path_set_context(keyring)
+
+
class Lockbox(object):
def __init__(self, args):
def create_partition(self):
self.device = Device.factory(self.args.lockbox, argparse.Namespace())
- partition_number = 3
+ partition_number = 5
self.device.create_partition(uuid=self.args.lockbox_uuid,
name='lockbox',
num=partition_number,
self.partition = self.create_partition()
def create_key(self):
- key_size = CryptHelpers.get_dmcrypt_keysize(self.args)
- key = open('/dev/urandom', 'rb').read(key_size / 8)
- base64_key = base64.b64encode(key)
cluster = self.args.cluster
bootstrap = self.args.prepare_key_template.format(cluster=cluster,
statedir=STATEDIR)
- command_check_call(
- [
- 'ceph',
- '--cluster', cluster,
- '--name', 'client.bootstrap-osd',
- '--keyring', bootstrap,
- 'config-key',
- 'put',
- 'dm-crypt/osd/' + self.args.osd_uuid + '/luks',
- base64_key,
- ],
- )
- keyring, stderr, ret = command(
+ path = self.get_mount_point()
+ secrets = LockboxSecrets(self.args)
+ id_arg = self.args.osd_id and [self.args.osd_id] or []
+ osd_id = command_with_stdin(
[
'ceph',
'--cluster', cluster,
'--name', 'client.bootstrap-osd',
'--keyring', bootstrap,
- 'auth',
- 'get-or-create',
- 'client.osd-lockbox.' + self.args.osd_uuid,
- 'mon',
- ('allow command "config-key get" with key="dm-crypt/osd/' +
- self.args.osd_uuid + '/luks"'),
- ],
+ '-i', '-',
+ 'osd', 'new', self.args.osd_uuid,
+ ] + id_arg,
+ secrets.get_json()
)
- LOG.debug("stderr " + stderr)
- assert ret == 0
- path = self.get_mount_point()
- open(os.path.join(path, 'keyring'), 'w').write(keyring)
+ secrets.write_lockbox_keyring(path, self.args.osd_uuid)
+ osd_id = must_be_one_line(osd_id)
+ check_osd_id(osd_id)
+ write_one_line(path, 'whoami', osd_id)
+ secrets.write_osd_keyring(os.path.join(path, 'osd_keyring'), osd_id)
write_one_line(path, 'key-management-mode', KEY_MANAGEMENT_MODE_V1)
def symlink_spaces(self, path):
write_one_line(path, 'ceph_fsid', self.args.cluster_uuid)
write_one_line(path, 'fsid', self.args.osd_uuid)
+ if self.args.osd_id:
+ write_one_line(path, 'wanttobe', self.args.osd_id)
if self.args.crush_device_class:
write_one_line(path, 'crush_device_class',
self.args.crush_device_class)
write_one_line(path, 'type', 'bluestore')
-#
-# Temporary workaround: if ceph-osd --mkfs does not
-# complete within 5 minutes, assume it is blocked
-# because of http://tracker.ceph.com/issues/13522
-# and retry a few times.
-#
-# Remove this function calls with command_check_call
-# when http://tracker.ceph.com/issues/13522 is fixed
-#
-def ceph_osd_mkfs(arguments):
- timeout = _get_command_executable(['timeout'])
- mkfs_ok = False
- error = 'unknown error'
- for delay in os.environ.get('CEPH_OSD_MKFS_DELAYS',
- '300 300 300 300 300').split():
- try:
- _check_output(timeout + [delay] + arguments)
- mkfs_ok = True
- break
- except subprocess.CalledProcessError as e:
- error = e.output
- if e.returncode == 124: # timeout fired, retry
- LOG.debug('%s timed out : %s (retry)'
- % (str(arguments), error))
- else:
- break
- if not mkfs_ok:
- raise Error('%s failed : %s' % (str(arguments), error))
-
-
def mkfs(
path,
cluster,
osd_type = read_one_line(path, 'type')
if osd_type == 'bluestore':
- ceph_osd_mkfs(
+ command_check_call(
[
'ceph-osd',
'--cluster', cluster,
'--mkfs',
- '--mkkey',
'-i', osd_id,
'--monmap', monmap,
'--osd-data', path,
'--osd-uuid', fsid,
- '--keyring', os.path.join(path, 'keyring'),
'--setuser', get_ceph_user(),
'--setgroup', get_ceph_group(),
],
)
elif osd_type == 'filestore':
- ceph_osd_mkfs(
+ command_check_call(
[
'ceph-osd',
'--cluster', cluster,
'--mkfs',
- '--mkkey',
'-i', osd_id,
'--monmap', monmap,
'--osd-data', path,
'--osd-journal', os.path.join(path, 'journal'),
'--osd-uuid', fsid,
- '--keyring', os.path.join(path, 'keyring'),
'--setuser', get_ceph_user(),
'--setgroup', get_ceph_group(),
],
raise Error('unrecognized objectstore type %s' % osd_type)
-def auth_key(
- path,
- cluster,
- osd_id,
- keyring,
-):
- try:
- # try dumpling+ cap scheme
- command_check_call(
- [
- 'ceph',
- '--cluster', cluster,
- '--name', 'client.bootstrap-osd',
- '--keyring', keyring,
- 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
- '-i', os.path.join(path, 'keyring'),
- 'osd', 'allow *',
- 'mon', 'allow profile osd',
- ],
- )
- except subprocess.CalledProcessError as err:
- if err.returncode == errno.EINVAL:
- # try old cap scheme
- command_check_call(
- [
- 'ceph',
- '--cluster', cluster,
- '--name', 'client.bootstrap-osd',
- '--keyring', keyring,
- 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
- '-i', os.path.join(path, 'keyring'),
- 'osd', 'allow *',
- 'mon', 'allow rwx',
- ],
- )
- else:
- raise
-
-
def get_mount_point(cluster, osd_id):
parent = STATEDIR + '/osd'
return os.path.join(
cluster=cluster,
fsid=fsid,
keyring=keyring,
+ path=path,
)
write_one_line(path, 'whoami', osd_id)
LOG.debug('OSD id is %s', osd_id)
pass
if not os.path.exists(os.path.join(path, 'active')):
- LOG.debug('Authorizing OSD key...')
- auth_key(
- path=path,
- cluster=cluster,
- osd_id=osd_id,
- keyring=keyring,
- )
write_one_line(path, 'active', 'ok')
LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
return (osd_id, cluster)
###########################
-def _remove_from_crush_map(cluster, osd_id):
- LOG.info("Prepare to remove osd.%s from crush map..." % osd_id)
- command([
- 'ceph',
- 'osd',
- 'crush',
- 'remove',
- 'osd.%s' % osd_id,
- ])
-
-
-def _delete_osd_auth_key(cluster, osd_id):
- LOG.info("Prepare to delete osd.%s cephx key..." % osd_id)
- command([
- 'ceph',
- 'auth',
- 'del',
- 'osd.%s' % osd_id,
- ])
-
-
-def _deallocate_osd_id(cluster, osd_id):
- LOG.info("Prepare to deallocate the osd-id: %s..." % osd_id)
- command([
- 'ceph',
- 'osd',
- 'rm',
- '%s' % osd_id,
- ])
-
-
def _remove_lockbox(uuid, cluster):
- command([
- 'ceph',
- '--cluster', cluster,
- 'auth',
- 'del',
- 'client.osd-lockbox.' + uuid,
- ])
- command([
- 'ceph',
- '--cluster', cluster,
- 'config-key',
- 'del',
- 'dm-crypt/osd/' + uuid + '/luks',
- ])
lockbox = os.path.join(STATEDIR, 'osd-lockbox')
if not os.path.exists(lockbox):
return
raise Error("Could not destroy the active osd. (osd-id: %s)" %
osd_id)
- # Remove OSD from crush map
- _remove_from_crush_map(args.cluster, osd_id)
-
- # Remove OSD cephx key
- _delete_osd_auth_key(args.cluster, osd_id)
-
- # Deallocate OSD ID
- _deallocate_osd_id(args.cluster, osd_id)
+ if args.purge:
+ action = 'purge'
+ else:
+ action = 'destroy'
+ LOG.info("Prepare to %s osd.%s" % (action, osd_id))
+ command([
+ 'ceph',
+ 'osd',
+ action,
+ 'osd.%s' % osd_id,
+ '--yes-i-really-mean-it',
+ ])
# we remove the crypt map and device mapper (if dmcrypt is True)
if dmcrypt:
if not os.path.exists(path):
raise Error('%s does not exist' % path)
- if path_is_diskdevice(path):
+ if not path_is_diskdevice(path):
raise Error('%s is not a block device' % path)
if (is_partition(path) and
if not os.path.exists(args.dev):
raise Error('%s does not exist' % args.dev)
+ if is_suppressed(args.dev):
+ LOG.info('suppressed activate request on space %s', args.dev)
+ return
+
cluster = None
osd_id = None
osd_uuid = None
disk = os.path.realpath(path)
if not os.path.exists(disk):
raise Error('does not exist', path)
- if ldev_is_diskdevice(path):
+ if not ldev_is_diskdevice(path):
raise Error('not a block device', path)
base = get_dev_name(disk)
def main_fix(args):
# A hash table containing 'path': ('uid', 'gid', blocking, recursive)
fix_table = [
- ('/usr/bin/ceph-mon', 'root', 'root', True, False),
- ('/usr/bin/ceph-mds', 'root', 'root', True, False),
- ('/usr/bin/ceph-osd', 'root', 'root', True, False),
- ('/usr/bin/radosgw', 'root', 'root', True, False),
- ('/etc/ceph', 'root', 'root', True, True),
+ ('/usr/bin/ceph-mon', 'root', ROOTGROUP, True, False),
+ ('/usr/bin/ceph-mds', 'root', ROOTGROUP, True, False),
+ ('/usr/bin/ceph-osd', 'root', ROOTGROUP, True, False),
+ ('/usr/bin/radosgw', 'root', ROOTGROUP, True, False),
+ ('/etc/ceph', 'root', ROOTGROUP, True, True),
('/var/run/ceph', 'ceph', 'ceph', True, True),
('/var/log/ceph', 'ceph', 'ceph', True, True),
('/var/log/radosgw', 'ceph', 'ceph', True, True),
destroy_parser = subparsers.add_parser(
'destroy',
formatter_class=argparse.RawDescriptionHelpFormatter,
- description=textwrap.fill(textwrap.dedent("""\
- Destroy the OSD located at PATH.
- It removes the OSD from the cluster, the crushmap and
- deallocates the OSD id. An OSD must be down before it
- can be destroyed.
- """)),
+ description=textwrap.fill(textwrap.dedent("""\ Destroy the OSD located at PATH. It removes the OSD from the
+ cluster and marks it destroyed. An OSD must be down before it
+ can be destroyed. Once it is destroyed, a new OSD can be created
+ in its place, reusing the same OSD id and position (e.g. after
+ a failed HDD or SSD is replaced). Alternatively, if the
+ --purge option is also specified, the OSD is removed from the
+ CRUSH map and the OSD id is deallocated.""")),
help='Destroy a Ceph OSD')
destroy_parser.add_argument(
'--cluster',
action='store_true', default=False,
help='option to erase data and partition',
)
+ destroy_parser.add_argument(
+ '--purge',
+ action='store_true', default=False,
+ help='option to remove OSD from CRUSH map and deallocate the id',
+ )
destroy_parser.set_defaults(
func=main_destroy,
)