]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_system.py
update sources to v12.1.3
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_system.py
1 import os
2 import pwd
3 import getpass
4 from textwrap import dedent
5 from ceph_volume.util import system
6
7
8 class TestMkdirP(object):
9
10 def test_existing_dir_does_not_raise_w_chown(self, monkeypatch, tmpdir):
11 user = pwd.getpwnam(getpass.getuser())
12 uid, gid = user[2], user[3]
13 monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,))
14 path = str(tmpdir)
15 system.mkdir_p(path)
16 assert os.path.isdir(path)
17
18 def test_new_dir_w_chown(self, monkeypatch, tmpdir):
19 user = pwd.getpwnam(getpass.getuser())
20 uid, gid = user[2], user[3]
21 monkeypatch.setattr(system, 'get_ceph_user_ids', lambda: (uid, gid,))
22 path = os.path.join(str(tmpdir), 'new')
23 system.mkdir_p(path)
24 assert os.path.isdir(path)
25
26 def test_existing_dir_does_not_raise_no_chown(self, tmpdir):
27 path = str(tmpdir)
28 system.mkdir_p(path, chown=False)
29 assert os.path.isdir(path)
30
31 def test_new_dir_no_chown(self, tmpdir):
32 path = os.path.join(str(tmpdir), 'new')
33 system.mkdir_p(path, chown=False)
34 assert os.path.isdir(path)
35
36
37 class TestIsMounted(object):
38
39 def test_not_mounted(self, tmpdir, monkeypatch):
40 PROCDIR = str(tmpdir)
41 proc_path = os.path.join(PROCDIR, 'mounts')
42 with open(proc_path, 'w') as f:
43 f.write('')
44 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
45 assert system.is_mounted('sdb') is False
46
47 def test_is_mounted_(self, tmpdir, monkeypatch):
48 PROCDIR = str(tmpdir)
49 proc_path = os.path.join(PROCDIR, 'mounts')
50 with open(proc_path, 'w') as f:
51 f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
52 /dev/sdc2 /boot xfs rw,seclabel,relatime,attr2,inode64,noquota 0 0
53 tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
54 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
55 monkeypatch.setattr(os.path, 'exists', lambda x: True)
56 assert system.is_mounted('/dev/sdc2') is True
57
58 def test_ignores_two_fields(self, tmpdir, monkeypatch):
59 PROCDIR = str(tmpdir)
60 proc_path = os.path.join(PROCDIR, 'mounts')
61 with open(proc_path, 'w') as f:
62 f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
63 /dev/sdc2 /boot
64 tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
65 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
66 monkeypatch.setattr(os.path, 'exists', lambda x: True)
67 assert system.is_mounted('/dev/sdc2') is False
68
69 def test_not_mounted_at_destination(self, tmpdir, monkeypatch):
70 PROCDIR = str(tmpdir)
71 proc_path = os.path.join(PROCDIR, 'mounts')
72 with open(proc_path, 'w') as f:
73 f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
74 /dev/sdc2 /var/lib/ceph/osd/ceph-9 xfs rw,attr2,inode64,noquota 0 0
75 tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
76 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
77 monkeypatch.setattr(os.path, 'exists', lambda x: True)
78 assert system.is_mounted('/dev/sdc2', '/var/lib/ceph/osd/ceph-0') is False
79
80 def test_is_mounted_at_destination(self, tmpdir, monkeypatch):
81 PROCDIR = str(tmpdir)
82 proc_path = os.path.join(PROCDIR, 'mounts')
83 with open(proc_path, 'w') as f:
84 f.write(dedent("""nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
85 /dev/sdc2 /var/lib/ceph/osd/ceph-0 xfs rw,attr2,inode64,noquota 0 0
86 tmpfs /run/user/1000 tmpfs rw,seclabel,mode=700,uid=1000,gid=1000 0 0"""))
87 monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
88 monkeypatch.setattr(os.path, 'exists', lambda x: True)
89 assert system.is_mounted('/dev/sdc2', '/var/lib/ceph/osd/ceph-0') is True