]>
Commit | Line | Data |
---|---|---|
d2e6a577 FG |
1 | import os |
2 | import pwd | |
3 | import getpass | |
3efd9988 | 4 | import pytest |
d2e6a577 FG |
5 | from textwrap import dedent |
6 | from ceph_volume.util import system | |
7 | ||
8 | ||
9 | class TestMkdirP(object): | |
10 | ||
11 | def test_existing_dir_does_not_raise_w_chown(self, monkeypatch, tmpdir): | |
12 | user = pwd.getpwnam(getpass.getuser()) | |
13 | uid, gid = user[2], user[3] | |
14 | monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,)) | |
15 | path = str(tmpdir) | |
16 | system.mkdir_p(path) | |
17 | assert os.path.isdir(path) | |
18 | ||
19 | def test_new_dir_w_chown(self, monkeypatch, tmpdir): | |
20 | user = pwd.getpwnam(getpass.getuser()) | |
21 | uid, gid = user[2], user[3] | |
22 | monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,)) | |
23 | path = os.path.join(str(tmpdir), 'new') | |
24 | system.mkdir_p(path) | |
25 | assert os.path.isdir(path) | |
26 | ||
27 | def test_existing_dir_does_not_raise_no_chown(self, tmpdir): | |
28 | path = str(tmpdir) | |
29 | system.mkdir_p(path, chown=False) | |
30 | assert os.path.isdir(path) | |
31 | ||
32 | def test_new_dir_no_chown(self, tmpdir): | |
33 | path = os.path.join(str(tmpdir), 'new') | |
34 | system.mkdir_p(path, chown=False) | |
35 | assert os.path.isdir(path) | |
36 | ||
37 | ||
3efd9988 FG |
38 | @pytest.fixture |
39 | def fake_proc(tmpdir, monkeypatch): | |
40 | PROCDIR = str(tmpdir) | |
41 | proc_path = os.path.join(PROCDIR, 'mounts') | |
42 | with open(proc_path, 'w') as f: | |
43 | f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0 | |
44 | rootfs / rootfs rw 0 0 | |
45 | sysfs /sys sysfs rw,seclabel,nosuid,nodev,noexec,relatime 0 0 | |
46 | proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0 | |
47 | devtmpfs /dev devtmpfs rw,seclabel,nosuid,size=238292k,nr_inodes=59573,mode=755 0 0 | |
48 | securityfs /sys/kernel/security securityfs rw,nosuid,nodev,noexec,relatime 0 0 | |
49 | tmpfs /dev/shm tmpfs rw,seclabel,nosuid,nodev 0 0 | |
50 | devpts /dev/pts devpts rw,seclabel,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000 0 0 | |
51 | tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0 | |
52 | tmpfs /sys/fs/cgroup tmpfs ro,seclabel,nosuid,nodev,noexec,mode=755 0 0 | |
53 | cgroup /sys/fs/cgroup/systemd cgroup rw,nosuid,nodev,noexec,relatime,xattr,release_agent=/usr/lib/systemd/systemd-cgroups-agent,name=systemd 0 0 | |
54 | cgroup /sys/fs/cgroup/freezer cgroup rw,nosuid,nodev,noexec,relatime,freezer 0 0 | |
55 | configfs /sys/kernel/config configfs rw,relatime 0 0 | |
56 | /dev/mapper/VolGroup00-LogVol00 / xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0 | |
57 | selinuxfs /sys/fs/selinux selinuxfs rw,relatime 0 0 | |
58 | debugfs /sys/kernel/debug debugfs rw,relatime 0 0 | |
59 | hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0 | |
60 | mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0 | |
61 | sunrpc /far/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0 | |
62 | /dev/sde4 /two/field/path | |
63 | nfsd /proc/fs/nfsd nfsd rw,relatime 0 0 | |
64 | /dev/sde2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0 | |
65 | tmpfs /far/lib/ceph/osd/ceph-5 tmpfs rw,seclabel,relatime 0 0 | |
66 | tmpfs /far/lib/ceph/osd/ceph-7 tmpfs rw,seclabel,relatime 0 0 | |
67 | /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,seclabel,noatime,attr2,inode64,noquota 0 0 | |
68 | tmpfs /run/user/1000 tmpfs rw,seclabel,nosuid,nodev,relatime,size=50040k,mode=700,uid=1000,gid=1000 0 0 | |
69 | /dev/sdc2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0 | |
70 | tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0""")) | |
71 | monkeypatch.setattr(system, 'PROCDIR', PROCDIR) | |
72 | monkeypatch.setattr(os.path, 'exists', lambda x: True) | |
73 | ||
74 | ||
75 | class TestPathIsMounted(object): | |
76 | ||
77 | def test_is_mounted(self, fake_proc): | |
78 | assert system.path_is_mounted('/boot') is True | |
79 | ||
80 | def test_is_not_mounted(self, fake_proc): | |
81 | assert system.path_is_mounted('/far/fib/feph') is False | |
82 | ||
83 | def test_is_not_mounted_at_destination(self, fake_proc): | |
84 | assert system.path_is_mounted('/boot', destination='/dev/sda1') is False | |
85 | ||
86 | def test_is_mounted_at_destination(self, fake_proc): | |
87 | assert system.path_is_mounted('/boot', destination='/dev/sdc2') is True | |
88 | ||
89 | ||
90 | class TestDeviceIsMounted(object): | |
91 | ||
92 | def test_is_mounted(self, fake_proc): | |
93 | assert system.device_is_mounted('/dev/sda1') is True | |
94 | ||
95 | def test_path_is_not_device(self, fake_proc): | |
96 | assert system.device_is_mounted('/far/lib/ceph/osd/ceph-7') is False | |
97 | ||
98 | def test_is_not_mounted_at_destination(self, fake_proc): | |
99 | assert system.device_is_mounted('/dev/sda1', destination='/far/lib/ceph/osd/test-1') is False | |
100 | ||
101 | def test_is_mounted_at_destination(self, fake_proc): | |
102 | assert system.device_is_mounted('/dev/sda1', destination='/far/lib/ceph/osd/ceph-7') is False | |
103 | ||
104 | ||
105 | class TestGetMounts(object): | |
d2e6a577 FG |
106 | |
107 | def test_not_mounted(self, tmpdir, monkeypatch): | |
108 | PROCDIR = str(tmpdir) | |
109 | proc_path = os.path.join(PROCDIR, 'mounts') | |
110 | with open(proc_path, 'w') as f: | |
111 | f.write('') | |
112 | monkeypatch.setattr(system, 'PROCDIR', PROCDIR) | |
3efd9988 | 113 | assert system.get_mounts() == {} |
d2e6a577 | 114 | |
3efd9988 FG |
115 | def test_is_mounted_(self, fake_proc): |
116 | result = system.get_mounts() | |
117 | assert result['/dev/sdc2'] == ['/boot'] | |
d2e6a577 | 118 | |
3efd9988 FG |
119 | def test_ignores_two_fields(self, fake_proc): |
120 | result = system.get_mounts() | |
121 | assert result.get('/dev/sde4') is None | |
d2e6a577 | 122 | |
3efd9988 FG |
123 | def test_tmpfs_is_reported(self, fake_proc): |
124 | result = system.get_mounts() | |
125 | assert result['tmpfs'][0] == '/dev/shm' | |
126 | ||
127 | def test_non_skip_devs_arent_reported(self, fake_proc): | |
128 | result = system.get_mounts() | |
129 | assert result.get('cgroup') is None | |
130 | ||
131 | def test_multiple_mounts_are_appended(self, fake_proc): | |
132 | result = system.get_mounts() | |
133 | assert len(result['tmpfs']) == 7 | |
d2e6a577 | 134 | |
3efd9988 | 135 | def test_nonexistent_devices_are_skipped(self, tmpdir, monkeypatch): |
d2e6a577 FG |
136 | PROCDIR = str(tmpdir) |
137 | proc_path = os.path.join(PROCDIR, 'mounts') | |
138 | with open(proc_path, 'w') as f: | |
139 | f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0 | |
3efd9988 FG |
140 | /dev/sda1 /far/lib/ceph/osd/ceph-0 xfs rw,attr2,inode64,noquota 0 0 |
141 | /dev/sda2 /far/lib/ceph/osd/ceph-1 xfs rw,attr2,inode64,noquota 0 0""")) | |
d2e6a577 | 142 | monkeypatch.setattr(system, 'PROCDIR', PROCDIR) |
3efd9988 FG |
143 | monkeypatch.setattr(os.path, 'exists', lambda x: False if x == '/dev/sda1' else True) |
144 | result = system.get_mounts() | |
145 | assert result.get('/dev/sda1') is None | |
146 | ||
147 | ||
148 | class TestIsBinary(object): | |
149 | ||
150 | def test_is_binary(self, tmpfile): | |
151 | binary_path = tmpfile(contents='asd\n\nlkjh\x00') | |
152 | assert system.is_binary(binary_path) | |
153 | ||
154 | def test_is_not_binary(self, tmpfile): | |
155 | binary_path = tmpfile(contents='asd\n\nlkjh0') | |
156 | assert system.is_binary(binary_path) is False |