]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/util/system.py
7 from ceph_volume
import process
8 from . import as_string
11 # TODO: get these out of here and into a common area for others to consume
12 if platform
.system() == 'FreeBSD':
14 DEFAULT_FS_TYPE
= 'zfs'
15 PROCDIR
= '/compat/linux/proc'
16 # FreeBSD does not have blockdevices any more
21 DEFAULT_FS_TYPE
= 'xfs'
23 BLOCKDIR
= '/sys/block'
28 return str(uuid
.uuid4())
31 def get_ceph_user_ids():
33 Return the id and gid of the ceph user
36 user
= pwd
.getpwnam('ceph')
38 # is this even possible?
39 raise RuntimeError('"ceph" user is not available in the current system')
40 return user
[2], user
[3]
43 def mkdir_p(path
, chown
=True):
45 A `mkdir -p` that defaults to chown the path to the ceph user
50 if e
.errno
== errno
.EEXIST
:
55 uid
, gid
= get_ceph_user_ids()
56 os
.chown(path
, uid
, gid
)
59 def chown(path
, recursive
=True):
61 ``chown`` a path to the ceph user (uid and guid fetched at runtime)
63 uid
, gid
= get_ceph_user_ids()
64 if os
.path
.islink(path
):
65 path
= os
.path
.realpath(path
)
67 process
.run(['chown', '-R', 'ceph:ceph', path
])
69 os
.chown(path
, uid
, gid
)
74 Detect if a file path is a binary or not. Will falsely report as binary
75 when utf-16 encoded. In the ceph universe there is no such risk (yet)
77 with
open(path
, 'rb') as fp
:
78 contents
= fp
.read(8192)
79 if b
'\x00' in contents
: # a null byte may signal binary
84 class tmp_mount(object):
86 Temporarily mount a device on a temporary directory,
87 and unmount it upon exit
90 def __init__(self
, device
):
95 self
.path
= tempfile
.mkdtemp()
105 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
114 def path_is_mounted(path
, destination
=None):
116 Check if the given path is mounted
118 mounts
= get_mounts(paths
=True)
119 realpath
= os
.path
.realpath(path
)
120 mounted_locations
= mounts
.get(realpath
, [])
123 if destination
.startswith('/'):
124 destination
= os
.path
.realpath(destination
)
125 return destination
in mounted_locations
126 return mounted_locations
!= []
129 def device_is_mounted(dev
, destination
=None):
131 Check if the given device is mounted, optionally validating that a
134 mounts
= get_mounts(devices
=True)
135 realpath
= os
.path
.realpath(dev
) if dev
.startswith('/') else dev
136 destination
= os
.path
.realpath(destination
) if destination
else None
137 mounted_locations
= mounts
.get(realpath
, [])
140 return destination
in mounted_locations
141 return mounted_locations
!= []
144 def get_mounts(devices
=False, paths
=False):
146 Create a mapping of all available system mounts so that other helpers can
147 detect nicely what path or device is mounted
149 It ignores (most of) non existing devices, but since some setups might need
150 some extra device information, it will make an exception for:
155 If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
156 if ``paths`` is set to ``True`` then the mapping will be
161 do_not_skip
= ['tmpfs', 'devtmpfs']
162 default_to_devices
= devices
is False and paths
is False
164 with
open(PROCDIR
+ '/mounts', 'rb') as mounts
:
165 proc_mounts
= mounts
.readlines()
167 for line
in proc_mounts
:
168 fields
= [as_string(f
) for f
in line
.split()]
171 device
= os
.path
.realpath(fields
[0]) if fields
[0].startswith('/') else fields
[0]
172 path
= os
.path
.realpath(fields
[1])
173 # only care about actual existing devices
174 if not os
.path
.exists(device
) or not device
.startswith('/'):
175 if device
not in do_not_skip
:
177 if device
in devices_mounted
.keys():
178 devices_mounted
[device
].append(path
)
180 devices_mounted
[device
] = [path
]
181 if path
in paths_mounted
.keys():
182 paths_mounted
[path
].append(device
)
184 paths_mounted
[path
] = [device
]
186 # Default to returning information for devices if
187 if devices
is True or default_to_devices
:
188 return devices_mounted