]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/util/arg_validators.py
534c9aa64673693c617f461ab14c897fc3f553ab
[ceph.git] / ceph / src / ceph-volume / ceph_volume / util / arg_validators.py
1 import argparse
2 import os
3 from ceph_volume import terminal
4 from ceph_volume import decorators
5 from ceph_volume.util import disk
6 from ceph_volume.util.device import Device
7
8
9 class ValidDevice(object):
10
11 def __init__(self, as_string=False):
12 self.as_string = as_string
13
14 def __call__(self, string):
15 device = Device(string)
16 error = None
17 if not device.exists:
18 error = "Unable to proceed with non-existing device: %s" % string
19 elif device.has_gpt_headers:
20 error = "GPT headers found, they must be removed on: %s" % string
21
22 if error:
23 raise argparse.ArgumentError(None, error)
24
25 if self.as_string:
26 if device.is_lv:
27 # all codepaths expect an lv path to be returned in this format
28 return "{}/{}".format(device.vg_name, device.lv_name)
29 return string
30 return device
31
32
33 class OSDPath(object):
34 """
35 Validate path exists and it looks like an OSD directory.
36 """
37
38 @decorators.needs_root
39 def __call__(self, string):
40 if not os.path.exists(string):
41 error = "Path does not exist: %s" % string
42 raise argparse.ArgumentError(None, error)
43
44 arg_is_partition = disk.is_partition(string)
45 if arg_is_partition:
46 return os.path.abspath(string)
47 absolute_path = os.path.abspath(string)
48 if not os.path.isdir(absolute_path):
49 error = "Argument is not a directory or device which is required to scan"
50 raise argparse.ArgumentError(None, error)
51 key_files = ['ceph_fsid', 'fsid', 'keyring', 'ready', 'type', 'whoami']
52 dir_files = os.listdir(absolute_path)
53 for key_file in key_files:
54 if key_file not in dir_files:
55 terminal.error('All following files must exist in path: %s' % ' '.join(key_files))
56 error = "Required file (%s) was not found in OSD dir path: %s" % (
57 key_file,
58 absolute_path
59 )
60 raise argparse.ArgumentError(None, error)
61
62 return os.path.abspath(string)
63
64
65 def exclude_group_options(parser, groups, argv=None):
66 """
67 ``argparse`` has the ability to check for mutually exclusive options, but
68 it only allows a basic XOR behavior: only one flag can be used from
69 a defined group of options. This doesn't help when two groups of options
70 need to be separated. For example, with filestore and bluestore, neither
71 set can be used in conjunction with the other set.
72
73 This helper validator will consume the parser to inspect the group flags,
74 and it will group them together from ``groups``. This allows proper error
75 reporting, matching each incompatible flag with its group name.
76
77 :param parser: The argparse object, once it has configured all flags. It is
78 required to contain the group names being used to validate.
79 :param groups: A list of group names (at least two), with the same used for
80 ``add_argument_group``
81 :param argv: Consume the args (sys.argv) directly from this argument
82
83 .. note: **Unfortunately** this will not be able to validate correctly when
84 using default flags. In the case of filestore vs. bluestore, ceph-volume
85 defaults to --bluestore, but we can't check that programmatically, we can
86 only parse the flags seen via argv
87 """
88 # Reduce the parser groups to only the groups we need to intersect
89 parser_groups = [g for g in parser._action_groups if g.title in groups]
90 # A mapping of the group name to flags/options
91 group_flags = {}
92 flags_to_verify = []
93 for group in parser_groups:
94 # option groups may have more than one item in ``option_strings``, this
95 # will loop over ``_group_actions`` which contains the
96 # ``option_strings``, like ``['--filestore']``
97 group_flags[group.title] = [
98 option for group_action in group._group_actions
99 for option in group_action.option_strings
100 ]
101
102 # Gather all the flags present in the groups so that we only check on those.
103 for flags in group_flags.values():
104 flags_to_verify.extend(flags)
105
106 seen = []
107 last_flag = None
108 last_group = None
109 for flag in argv:
110 if flag not in flags_to_verify:
111 continue
112 for group_name, flags in group_flags.items():
113 if flag in flags:
114 seen.append(group_name)
115 # We are mutually excluding groups, so having more than 1 group
116 # in ``seen`` means we must raise an error
117 if len(set(seen)) == len(groups):
118 terminal.warning('Incompatible flags were found, some values may get ignored')
119 msg = 'Cannot use %s (%s) with %s (%s)' % (
120 last_flag, last_group, flag, group_name
121 )
122 terminal.warning(msg)
123 last_group = group_name
124 last_flag = flag