]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_system.py
update sources to 12.2.8
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_system.py
CommitLineData
d2e6a577
FG
1import os
2import pwd
3import getpass
3efd9988 4import pytest
d2e6a577
FG
5from textwrap import dedent
6from ceph_volume.util import system
7
8
9class TestMkdirP(object):
10
11 def test_existing_dir_does_not_raise_w_chown(self, monkeypatch, tmpdir):
12 user = pwd.getpwnam(getpass.getuser())
13 uid, gid = user[2], user[3]
14 monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,))
15 path = str(tmpdir)
16 system.mkdir_p(path)
17 assert os.path.isdir(path)
18
19 def test_new_dir_w_chown(self, monkeypatch, tmpdir):
20 user = pwd.getpwnam(getpass.getuser())
21 uid, gid = user[2], user[3]
22 monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,))
23 path = os.path.join(str(tmpdir), 'new')
24 system.mkdir_p(path)
25 assert os.path.isdir(path)
26
27 def test_existing_dir_does_not_raise_no_chown(self, tmpdir):
28 path = str(tmpdir)
29 system.mkdir_p(path, chown=False)
30 assert os.path.isdir(path)
31
32 def test_new_dir_no_chown(self, tmpdir):
33 path = os.path.join(str(tmpdir), 'new')
34 system.mkdir_p(path, chown=False)
35 assert os.path.isdir(path)
36
37
3efd9988
FG
38@pytest.fixture
39def fake_proc(tmpdir, monkeypatch):
40 PROCDIR = str(tmpdir)
41 proc_path = os.path.join(PROCDIR, 'mounts')
42 with open(proc_path, 'w') as f:
43 f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
44 rootfs / rootfs rw 0 0
45 sysfs /sys sysfs rw,seclabel,nosuid,nodev,noexec,relatime 0 0
46 proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0
47 devtmpfs /dev devtmpfs rw,seclabel,nosuid,size=238292k,nr_inodes=59573,mode=755 0 0
48 securityfs /sys/kernel/security securityfs rw,nosuid,nodev,noexec,relatime 0 0
49 tmpfs /dev/shm tmpfs rw,seclabel,nosuid,nodev 0 0
50 devpts /dev/pts devpts rw,seclabel,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000 0 0
51 tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0
52 tmpfs /sys/fs/cgroup tmpfs ro,seclabel,nosuid,nodev,noexec,mode=755 0 0
53 cgroup /sys/fs/cgroup/systemd cgroup rw,nosuid,nodev,noexec,relatime,xattr,release_agent=/usr/lib/systemd/systemd-cgroups-agent,name=systemd 0 0
54 cgroup /sys/fs/cgroup/freezer cgroup rw,nosuid,nodev,noexec,relatime,freezer 0 0
55 configfs /sys/kernel/config configfs rw,relatime 0 0
56 /dev/mapper/VolGroup00-LogVol00 / xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
57 selinuxfs /sys/fs/selinux selinuxfs rw,relatime 0 0
58 debugfs /sys/kernel/debug debugfs rw,relatime 0 0
59 hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0
60 mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0
61 sunrpc /far/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0
62 /dev/sde4 /two/field/path
63 nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
64 /dev/sde2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
65 tmpfs /far/lib/ceph/osd/ceph-5 tmpfs rw,seclabel,relatime 0 0
66 tmpfs /far/lib/ceph/osd/ceph-7 tmpfs rw,seclabel,relatime 0 0
67 /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,seclabel,noatime,attr2,inode64,noquota 0 0
68 tmpfs /run/user/1000 tmpfs rw,seclabel,nosuid,nodev,relatime,size=50040k,mode=700,uid=1000,gid=1000 0 0
69 /dev/sdc2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
70 tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
71 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
72 monkeypatch.setattr(os.path, 'exists', lambda x: True)
73
74
75class TestPathIsMounted(object):
76
77 def test_is_mounted(self, fake_proc):
78 assert system.path_is_mounted('/boot') is True
79
80 def test_is_not_mounted(self, fake_proc):
81 assert system.path_is_mounted('/far/fib/feph') is False
82
83 def test_is_not_mounted_at_destination(self, fake_proc):
84 assert system.path_is_mounted('/boot', destination='/dev/sda1') is False
85
86 def test_is_mounted_at_destination(self, fake_proc):
87 assert system.path_is_mounted('/boot', destination='/dev/sdc2') is True
88
89
90class TestDeviceIsMounted(object):
91
92 def test_is_mounted(self, fake_proc):
93 assert system.device_is_mounted('/dev/sda1') is True
94
95 def test_path_is_not_device(self, fake_proc):
96 assert system.device_is_mounted('/far/lib/ceph/osd/ceph-7') is False
97
98 def test_is_not_mounted_at_destination(self, fake_proc):
99 assert system.device_is_mounted('/dev/sda1', destination='/far/lib/ceph/osd/test-1') is False
100
101 def test_is_mounted_at_destination(self, fake_proc):
102 assert system.device_is_mounted('/dev/sda1', destination='/far/lib/ceph/osd/ceph-7') is False
103
b32b8144
FG
104 def test_is_realpath_dev_mounted_at_destination(self, fake_proc, monkeypatch):
105 monkeypatch.setattr(system.os.path, 'realpath', lambda x: '/dev/sda1' if 'foo' in x else x)
106 result = system.device_is_mounted('/dev/maper/foo', destination='/far/lib/ceph/osd/ceph-0')
107 assert result is True
108
109 def test_is_realpath_path_mounted_at_destination(self, fake_proc, monkeypatch):
110 monkeypatch.setattr(
111 system.os.path, 'realpath',
112 lambda x: '/far/lib/ceph/osd/ceph-0' if 'symlink' in x else x)
113 result = system.device_is_mounted('/dev/sda1', destination='/symlink/lib/ceph/osd/ceph-0')
114 assert result is True
115
3efd9988
FG
116
117class TestGetMounts(object):
d2e6a577
FG
118
119 def test_not_mounted(self, tmpdir, monkeypatch):
120 PROCDIR = str(tmpdir)
121 proc_path = os.path.join(PROCDIR, 'mounts')
122 with open(proc_path, 'w') as f:
123 f.write('')
124 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
3efd9988 125 assert system.get_mounts() == {}
d2e6a577 126
3efd9988
FG
127 def test_is_mounted_(self, fake_proc):
128 result = system.get_mounts()
129 assert result['/dev/sdc2'] == ['/boot']
d2e6a577 130
3efd9988
FG
131 def test_ignores_two_fields(self, fake_proc):
132 result = system.get_mounts()
133 assert result.get('/dev/sde4') is None
d2e6a577 134
3efd9988
FG
135 def test_tmpfs_is_reported(self, fake_proc):
136 result = system.get_mounts()
137 assert result['tmpfs'][0] == '/dev/shm'
138
139 def test_non_skip_devs_arent_reported(self, fake_proc):
140 result = system.get_mounts()
141 assert result.get('cgroup') is None
142
143 def test_multiple_mounts_are_appended(self, fake_proc):
144 result = system.get_mounts()
145 assert len(result['tmpfs']) == 7
d2e6a577 146
3efd9988 147 def test_nonexistent_devices_are_skipped(self, tmpdir, monkeypatch):
d2e6a577
FG
148 PROCDIR = str(tmpdir)
149 proc_path = os.path.join(PROCDIR, 'mounts')
150 with open(proc_path, 'w') as f:
151 f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
3efd9988
FG
152 /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,attr2,inode64,noquota 0 0
153 /dev/sda2 /far/lib/ceph/osd/ceph-1 xfs rw,attr2,inode64,noquota 0 0"""))
d2e6a577 154 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
3efd9988
FG
155 monkeypatch.setattr(os.path, 'exists', lambda x: False if x == '/dev/sda1' else True)
156 result = system.get_mounts()
157 assert result.get('/dev/sda1') is None
158
159
160class TestIsBinary(object):
161
162 def test_is_binary(self, tmpfile):
163 binary_path = tmpfile(contents='asd\n\nlkjh\x00')
164 assert system.is_binary(binary_path)
165
166 def test_is_not_binary(self, tmpfile):
167 binary_path = tmpfile(contents='asd\n\nlkjh0')
168 assert system.is_binary(binary_path) is False
94b18763
FG
169
170
1adf2230
AA
171class TestGetFileContents(object):
172
173 def test_path_does_not_exist(self, tmpdir):
174 filepath = os.path.join(str(tmpdir), 'doesnotexist')
175 assert system.get_file_contents(filepath, 'default') == 'default'
176
177 def test_path_has_contents(self, tmpfile):
178 interesting_file = tmpfile(contents="1")
179 result = system.get_file_contents(interesting_file)
180 assert result == "1"
181
182 def test_path_has_multiline_contents(self, tmpfile):
183 interesting_file = tmpfile(contents="0\n1")
184 result = system.get_file_contents(interesting_file)
185 assert result == "0\n1"
186
187 def test_exception_returns_default(self, tmpfile):
188 interesting_file = tmpfile(contents="0")
189 # remove read, causes IOError
190 os.chmod(interesting_file, 0o000)
191 result = system.get_file_contents(interesting_file)
192 assert result == ''
193
194
94b18763
FG
195class TestWhich(object):
196
197 def test_executable_exists_but_is_not_file(self, monkeypatch):
198 monkeypatch.setattr(system.os.path, 'isfile', lambda x: False)
199 monkeypatch.setattr(system.os.path, 'exists', lambda x: True)
200 assert system.which('exedir') == 'exedir'
201
202 def test_executable_does_not_exist(self, monkeypatch):
203 monkeypatch.setattr(system.os.path, 'isfile', lambda x: False)
204 monkeypatch.setattr(system.os.path, 'exists', lambda x: False)
205 assert system.which('exedir') == 'exedir'
206
207 def test_executable_exists_as_file(self, monkeypatch):
208 monkeypatch.setattr(system.os.path, 'isfile', lambda x: True)
209 monkeypatch.setattr(system.os.path, 'exists', lambda x: True)
210 assert system.which('ceph') == '/usr/local/bin/ceph'
211
212 def test_warnings_when_executable_isnt_matched(self, monkeypatch, capsys):
213 monkeypatch.setattr(system.os.path, 'isfile', lambda x: True)
214 monkeypatch.setattr(system.os.path, 'exists', lambda x: False)
215 system.which('exedir')
216 stdout, stderr = capsys.readouterr()
217 assert 'Absolute path not found for executable: exedir' in stdout
218 assert 'Ensure $PATH environment variable contains common executable locations' in stdout