]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_system.py
5 from textwrap
import dedent
6 from ceph_volume
.util
import system
9 class TestMkdirP(object):
11 def test_existing_dir_does_not_raise_w_chown(self
, monkeypatch
, tmpdir
):
12 user
= pwd
.getpwnam(getpass
.getuser())
13 uid
, gid
= user
[2], user
[3]
14 monkeypatch
.setattr(system
, 'get_ceph_user_ids', lambda: (uid
, gid
,))
17 assert os
.path
.isdir(path
)
19 def test_new_dir_w_chown(self
, monkeypatch
, tmpdir
):
20 user
= pwd
.getpwnam(getpass
.getuser())
21 uid
, gid
= user
[2], user
[3]
22 monkeypatch
.setattr(system
, 'get_ceph_user_ids', lambda: (uid
, gid
,))
23 path
= os
.path
.join(str(tmpdir
), 'new')
25 assert os
.path
.isdir(path
)
27 def test_existing_dir_does_not_raise_no_chown(self
, tmpdir
):
29 system
.mkdir_p(path
, chown
=False)
30 assert os
.path
.isdir(path
)
32 def test_new_dir_no_chown(self
, tmpdir
):
33 path
= os
.path
.join(str(tmpdir
), 'new')
34 system
.mkdir_p(path
, chown
=False)
35 assert os
.path
.isdir(path
)
39 def fake_proc(tmpdir
, monkeypatch
):
41 proc_path
= os
.path
.join(PROCDIR
, 'mounts')
42 with
open(proc_path
, 'w') as f
:
43 f
.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
44 rootfs / rootfs rw 0 0
45 sysfs /sys sysfs rw,seclabel,nosuid,nodev,noexec,relatime 0 0
46 proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0
47 devtmpfs /dev devtmpfs rw,seclabel,nosuid,size=238292k,nr_inodes=59573,mode=755 0 0
48 securityfs /sys/kernel/security securityfs rw,nosuid,nodev,noexec,relatime 0 0
49 tmpfs /dev/shm tmpfs rw,seclabel,nosuid,nodev 0 0
50 devpts /dev/pts devpts rw,seclabel,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000 0 0
51 tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0
52 tmpfs /sys/fs/cgroup tmpfs ro,seclabel,nosuid,nodev,noexec,mode=755 0 0
53 cgroup /sys/fs/cgroup/systemd cgroup rw,nosuid,nodev,noexec,relatime,xattr,release_agent=/usr/lib/systemd/systemd-cgroups-agent,name=systemd 0 0
54 cgroup /sys/fs/cgroup/freezer cgroup rw,nosuid,nodev,noexec,relatime,freezer 0 0
55 configfs /sys/kernel/config configfs rw,relatime 0 0
56 /dev/mapper/VolGroup00-LogVol00 / xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
57 selinuxfs /sys/fs/selinux selinuxfs rw,relatime 0 0
58 debugfs /sys/kernel/debug debugfs rw,relatime 0 0
59 hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0
60 mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0
61 sunrpc /far/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0
62 /dev/sde4 /two/field/path
63 nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
64 /dev/sde2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
65 tmpfs /far/lib/ceph/osd/ceph-5 tmpfs rw,seclabel,relatime 0 0
66 tmpfs /far/lib/ceph/osd/ceph-7 tmpfs rw,seclabel,relatime 0 0
67 /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,seclabel,noatime,attr2,inode64,noquota 0 0
68 tmpfs /run/user/1000 tmpfs rw,seclabel,nosuid,nodev,relatime,size=50040k,mode=700,uid=1000,gid=1000 0 0
69 /dev/sdc2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
70 tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
71 monkeypatch
.setattr(system
, 'PROCDIR', PROCDIR
)
72 monkeypatch
.setattr(os
.path
, 'exists', lambda x
: True)
75 class TestPathIsMounted(object):
77 def test_is_mounted(self
, fake_proc
):
78 assert system
.path_is_mounted('/boot') is True
80 def test_is_not_mounted(self
, fake_proc
):
81 assert system
.path_is_mounted('/far/fib/feph') is False
83 def test_is_not_mounted_at_destination(self
, fake_proc
):
84 assert system
.path_is_mounted('/boot', destination
='/dev/sda1') is False
86 def test_is_mounted_at_destination(self
, fake_proc
):
87 assert system
.path_is_mounted('/boot', destination
='/dev/sdc2') is True
90 class TestDeviceIsMounted(object):
92 def test_is_mounted(self
, fake_proc
):
93 assert system
.device_is_mounted('/dev/sda1') is True
95 def test_path_is_not_device(self
, fake_proc
):
96 assert system
.device_is_mounted('/far/lib/ceph/osd/ceph-7') is False
98 def test_is_not_mounted_at_destination(self
, fake_proc
):
99 assert system
.device_is_mounted('/dev/sda1', destination
='/far/lib/ceph/osd/test-1') is False
101 def test_is_mounted_at_destination(self
, fake_proc
):
102 assert system
.device_is_mounted('/dev/sda1', destination
='/far/lib/ceph/osd/ceph-7') is False
104 def test_is_realpath_dev_mounted_at_destination(self
, fake_proc
, monkeypatch
):
105 monkeypatch
.setattr(system
.os
.path
, 'realpath', lambda x
: '/dev/sda1' if 'foo' in x
else x
)
106 result
= system
.device_is_mounted('/dev/maper/foo', destination
='/far/lib/ceph/osd/ceph-0')
107 assert result
is True
109 def test_is_realpath_path_mounted_at_destination(self
, fake_proc
, monkeypatch
):
111 system
.os
.path
, 'realpath',
112 lambda x
: '/far/lib/ceph/osd/ceph-0' if 'symlink' in x
else x
)
113 result
= system
.device_is_mounted('/dev/sda1', destination
='/symlink/lib/ceph/osd/ceph-0')
114 assert result
is True
117 class TestGetMounts(object):
119 def test_not_mounted(self
, tmpdir
, monkeypatch
):
120 PROCDIR
= str(tmpdir
)
121 proc_path
= os
.path
.join(PROCDIR
, 'mounts')
122 with
open(proc_path
, 'w') as f
:
124 monkeypatch
.setattr(system
, 'PROCDIR', PROCDIR
)
125 assert system
.get_mounts() == {}
127 def test_is_mounted_(self
, fake_proc
):
128 result
= system
.get_mounts()
129 assert result
['/dev/sdc2'] == ['/boot']
131 def test_ignores_two_fields(self
, fake_proc
):
132 result
= system
.get_mounts()
133 assert result
.get('/dev/sde4') is None
135 def test_tmpfs_is_reported(self
, fake_proc
):
136 result
= system
.get_mounts()
137 assert result
['tmpfs'][0] == '/dev/shm'
139 def test_non_skip_devs_arent_reported(self
, fake_proc
):
140 result
= system
.get_mounts()
141 assert result
.get('cgroup') is None
143 def test_multiple_mounts_are_appended(self
, fake_proc
):
144 result
= system
.get_mounts()
145 assert len(result
['tmpfs']) == 7
147 def test_nonexistent_devices_are_skipped(self
, tmpdir
, monkeypatch
):
148 PROCDIR
= str(tmpdir
)
149 proc_path
= os
.path
.join(PROCDIR
, 'mounts')
150 with
open(proc_path
, 'w') as f
:
151 f
.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
152 /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,attr2,inode64,noquota 0 0
153 /dev/sda2 /far/lib/ceph/osd/ceph-1 xfs rw,attr2,inode64,noquota 0 0"""))
154 monkeypatch
.setattr(system
, 'PROCDIR', PROCDIR
)
155 monkeypatch
.setattr(os
.path
, 'exists', lambda x
: False if x
== '/dev/sda1' else True)
156 result
= system
.get_mounts()
157 assert result
.get('/dev/sda1') is None
160 class TestIsBinary(object):
162 def test_is_binary(self
, tmpfile
):
163 binary_path
= tmpfile(contents
='asd\n\nlkjh\x00')
164 assert system
.is_binary(binary_path
)
166 def test_is_not_binary(self
, tmpfile
):
167 binary_path
= tmpfile(contents
='asd\n\nlkjh0')
168 assert system
.is_binary(binary_path
) is False