import argparse
+import os
+from ceph_volume import terminal
+from ceph_volume import decorators
+from ceph_volume.util import disk
class LVPath(object):
<vg name>/<lv name>
+ Or a full path to a device, like ``/dev/sda``
+
Because for LVM it is better to be specific on what group does an lv
belongs to.
"""
def __call__(self, string):
error = None
+ if string.startswith('/'):
+ if not os.path.exists(string):
+ error = "Argument (device) does not exist: %s" % string
+ raise argparse.ArgumentError(None, error)
+ else:
+ return string
try:
vg, lv = string.split('/')
except ValueError:
if error:
raise argparse.ArgumentError(None, error)
return string
+
+
+class OSDPath(object):
+ """
+ Validate path exists and it looks like an OSD directory.
+ """
+
+ @decorators.needs_root
+ def __call__(self, string):
+ if not os.path.exists(string):
+ error = "Path does not exist: %s" % string
+ raise argparse.ArgumentError(None, error)
+
+ arg_is_partition = disk.is_partition(string)
+ if arg_is_partition:
+ return os.path.abspath(string)
+ absolute_path = os.path.abspath(string)
+ if not os.path.isdir(absolute_path):
+ error = "Argument is not a directory or device which is required to scan"
+ raise argparse.ArgumentError(None, error)
+ key_files = ['ceph_fsid', 'fsid', 'keyring', 'ready', 'type', 'whoami']
+ dir_files = os.listdir(absolute_path)
+ for key_file in key_files:
+ if key_file not in dir_files:
+ terminal.error('All following files must exist in path: %s' % ' '.join(key_files))
+ error = "Required file (%s) was not found in OSD dir path: %s" % (
+ key_file,
+ absolute_path
+ )
+ raise argparse.ArgumentError(None, error)
+
+ return os.path.abspath(string)