]>
git.proxmox.com Git - ceph.git/blob - ceph/src/cephadm/tests/test_util_funcs.py
270753a554b86e0a2cf65bcb442a95aaebb39974
1 # Tests for various assorted utility functions found within cephadm
3 from unittest
import mock
12 from tests
.fixtures
import with_cephadm_ctx
, import_cephadm
14 _cephadm
= import_cephadm()
18 def _copy_tree(self
, *args
, **kwargs
):
19 with
with_cephadm_ctx([]) as ctx
:
20 with mock
.patch("cephadm.extract_uid_gid") as eug
:
21 eug
.return_value
= (os
.getuid(), os
.getgid())
22 _cephadm
.copy_tree(ctx
, *args
, **kwargs
)
24 def test_one_dir(self
, tmp_path
):
25 """Copy one dir into a non-existing dest dir."""
26 src1
= tmp_path
/ "src1"
27 dst
= tmp_path
/ "dst"
28 src1
.mkdir(parents
=True)
30 with (src1
/ "foo.txt").open("w") as fh
:
34 assert not (dst
/ "foo.txt").exists()
36 self
._copy
_tree
([src1
], dst
)
37 assert (dst
/ "foo.txt").exists()
39 def test_one_existing_dir(self
, tmp_path
):
40 """Copy one dir into an existing dest dir."""
41 src1
= tmp_path
/ "src1"
42 dst
= tmp_path
/ "dst"
43 src1
.mkdir(parents
=True)
44 dst
.mkdir(parents
=True)
46 with (src1
/ "foo.txt").open("w") as fh
:
50 assert not (dst
/ "src1").exists()
52 self
._copy
_tree
([src1
], dst
)
53 assert (dst
/ "src1/foo.txt").exists()
55 def test_two_dirs(self
, tmp_path
):
56 """Copy two source directories into an existing dest dir."""
57 src1
= tmp_path
/ "src1"
58 src2
= tmp_path
/ "src2"
59 dst
= tmp_path
/ "dst"
60 src1
.mkdir(parents
=True)
61 src2
.mkdir(parents
=True)
62 dst
.mkdir(parents
=True)
64 with (src1
/ "foo.txt").open("w") as fh
:
67 with (src2
/ "bar.txt").open("w") as fh
:
71 assert not (dst
/ "src1").exists()
72 assert not (dst
/ "src2").exists()
74 self
._copy
_tree
([src1
, src2
], dst
)
75 assert (dst
/ "src1/foo.txt").exists()
76 assert (dst
/ "src2/bar.txt").exists()
78 def test_one_dir_set_uid(self
, tmp_path
):
79 """Explicity pass uid/gid values and assert these are passed to chown."""
80 # Because this test will often be run by non-root users it is necessary
81 # to mock os.chown or we too easily run into perms issues.
82 src1
= tmp_path
/ "src1"
83 dst
= tmp_path
/ "dst"
84 src1
.mkdir(parents
=True)
86 with (src1
/ "foo.txt").open("w") as fh
:
90 assert not (dst
/ "foo.txt").exists()
92 with mock
.patch("os.chown") as _chown
:
93 _chown
.return_value
= None
94 self
._copy
_tree
([src1
], dst
, uid
=0, gid
=0)
95 assert len(_chown
.mock_calls
) >= 2
96 for c
in _chown
.mock_calls
:
97 assert c
== mock
.call(mock
.ANY
, 0, 0)
98 assert (dst
/ "foo.txt").exists()
102 def _copy_files(self
, *args
, **kwargs
):
103 with
with_cephadm_ctx([]) as ctx
:
104 with mock
.patch("cephadm.extract_uid_gid") as eug
:
105 eug
.return_value
= (os
.getuid(), os
.getgid())
106 _cephadm
.copy_files(ctx
, *args
, **kwargs
)
108 def test_one_file(self
, tmp_path
):
109 """Copy one file into the dest dir."""
110 file1
= tmp_path
/ "f1.txt"
111 dst
= tmp_path
/ "dst"
112 dst
.mkdir(parents
=True)
114 with file1
.open("w") as fh
:
115 fh
.write("its test time\n")
117 self
._copy
_files
([file1
], dst
)
118 assert (dst
/ "f1.txt").exists()
120 def test_one_file_nodest(self
, tmp_path
):
121 """Copy one file to the given destination path."""
122 file1
= tmp_path
/ "f1.txt"
123 dst
= tmp_path
/ "dst"
125 with file1
.open("w") as fh
:
126 fh
.write("its test time\n")
128 self
._copy
_files
([file1
], dst
)
129 assert not dst
.is_dir()
131 assert dst
.open("r").read() == "its test time\n"
133 def test_three_files(self
, tmp_path
):
134 """Copy one file into the dest dir."""
135 file1
= tmp_path
/ "f1.txt"
136 file2
= tmp_path
/ "f2.txt"
137 file3
= tmp_path
/ "f3.txt"
138 dst
= tmp_path
/ "dst"
139 dst
.mkdir(parents
=True)
141 with file1
.open("w") as fh
:
142 fh
.write("its test time\n")
143 with file2
.open("w") as fh
:
145 with file3
.open("w") as fh
:
148 self
._copy
_files
([file1
, file2
, file3
], dst
)
149 assert (dst
/ "f1.txt").exists()
150 assert (dst
/ "f2.txt").exists()
151 assert (dst
/ "f3.txt").exists()
153 def test_three_files_nodest(self
, tmp_path
):
154 """Copy files to dest path (not a dir). This is not a useful operation."""
155 file1
= tmp_path
/ "f1.txt"
156 file2
= tmp_path
/ "f2.txt"
157 file3
= tmp_path
/ "f3.txt"
158 dst
= tmp_path
/ "dst"
160 with file1
.open("w") as fh
:
161 fh
.write("its test time\n")
162 with file2
.open("w") as fh
:
164 with file3
.open("w") as fh
:
167 self
._copy
_files
([file1
, file2
, file3
], dst
)
168 assert not dst
.is_dir()
170 assert dst
.open("r").read() == "f3\n"
172 def test_one_file_set_uid(self
, tmp_path
):
173 """Explicity pass uid/gid values and assert these are passed to chown."""
174 # Because this test will often be run by non-root users it is necessary
175 # to mock os.chown or we too easily run into perms issues.
176 file1
= tmp_path
/ "f1.txt"
177 dst
= tmp_path
/ "dst"
178 dst
.mkdir(parents
=True)
180 with file1
.open("w") as fh
:
181 fh
.write("its test time\n")
183 assert not (dst
/ "f1.txt").exists()
185 with mock
.patch("os.chown") as _chown
:
186 _chown
.return_value
= None
187 self
._copy
_files
([file1
], dst
, uid
=0, gid
=0)
188 assert len(_chown
.mock_calls
) >= 1
189 for c
in _chown
.mock_calls
:
190 assert c
== mock
.call(mock
.ANY
, 0, 0)
191 assert (dst
/ "f1.txt").exists()
195 def _move_files(self
, *args
, **kwargs
):
196 with
with_cephadm_ctx([]) as ctx
:
197 with mock
.patch("cephadm.extract_uid_gid") as eug
:
198 eug
.return_value
= (os
.getuid(), os
.getgid())
199 _cephadm
.move_files(ctx
, *args
, **kwargs
)
201 def test_one_file(self
, tmp_path
):
202 """Move a named file to test dest path."""
203 file1
= tmp_path
/ "f1.txt"
204 dst
= tmp_path
/ "dst"
206 with file1
.open("w") as fh
:
207 fh
.write("lets moove\n")
209 assert not dst
.exists()
210 assert file1
.is_file()
212 self
._move
_files
([file1
], dst
)
214 assert not file1
.exists()
216 def test_one_file_destdir(self
, tmp_path
):
217 """Move a file into an existing dest dir."""
218 file1
= tmp_path
/ "f1.txt"
219 dst
= tmp_path
/ "dst"
220 dst
.mkdir(parents
=True)
222 with file1
.open("w") as fh
:
223 fh
.write("lets moove\n")
225 assert not (dst
/ "f1.txt").exists()
226 assert file1
.is_file()
228 self
._move
_files
([file1
], dst
)
229 assert (dst
/ "f1.txt").is_file()
230 assert not file1
.exists()
232 def test_one_file_one_link(self
, tmp_path
):
233 """Move a file and a symlink to that file to a dest dir."""
234 file1
= tmp_path
/ "f1.txt"
235 link1
= tmp_path
/ "lnk"
236 dst
= tmp_path
/ "dst"
237 dst
.mkdir(parents
=True)
239 with file1
.open("w") as fh
:
240 fh
.write("lets moove\n")
241 os
.symlink("f1.txt", link1
)
243 assert not (dst
/ "f1.txt").exists()
244 assert file1
.is_file()
245 assert link1
.exists()
247 self
._move
_files
([file1
, link1
], dst
)
248 assert (dst
/ "f1.txt").is_file()
249 assert (dst
/ "lnk").is_symlink()
250 assert not file1
.exists()
251 assert not link1
.exists()
252 assert (dst
/ "f1.txt").open("r").read() == "lets moove\n"
253 assert (dst
/ "lnk").open("r").read() == "lets moove\n"
255 def test_one_file_set_uid(self
, tmp_path
):
256 """Explicity pass uid/gid values and assert these are passed to chown."""
257 # Because this test will often be run by non-root users it is necessary
258 # to mock os.chown or we too easily run into perms issues.
259 file1
= tmp_path
/ "f1.txt"
260 dst
= tmp_path
/ "dst"
262 with file1
.open("w") as fh
:
263 fh
.write("lets moove\n")
265 assert not dst
.exists()
266 assert file1
.is_file()
268 with mock
.patch("os.chown") as _chown
:
269 _chown
.return_value
= None
270 self
._move
_files
([file1
], dst
, uid
=0, gid
=0)
271 assert len(_chown
.mock_calls
) >= 1
272 for c
in _chown
.mock_calls
:
273 assert c
== mock
.call(mock
.ANY
, 0, 0)
275 assert not file1
.exists()
278 def test_recursive_chown(tmp_path
):
279 d1
= tmp_path
/ "dir1"
281 f1
= d2
/ "file1.txt"
282 d2
.mkdir(parents
=True)
284 with f1
.open("w") as fh
:
285 fh
.write("low down\n")
287 with mock
.patch("os.chown") as _chown
:
288 _chown
.return_value
= None
289 _cephadm
.recursive_chown(str(d1
), uid
=500, gid
=500)
290 assert len(_chown
.mock_calls
) == 3
291 assert _chown
.mock_calls
[0] == mock
.call(str(d1
), 500, 500)
292 assert _chown
.mock_calls
[1] == mock
.call(str(d2
), 500, 500)
293 assert _chown
.mock_calls
[2] == mock
.call(str(f1
), 500, 500)
296 class TestFindExecutable
:
297 def test_standard_exe(self
):
298 # pretty much every system will have `true` on the path. It's a safe choice
299 # for the first assertion
300 exe
= _cephadm
.find_executable("true")
301 assert exe
.endswith("true")
303 def test_custom_path(self
, tmp_path
):
304 foo_sh
= tmp_path
/ "foo.sh"
305 with
open(foo_sh
, "w") as fh
:
306 fh
.write("#!/bin/sh\n")
307 fh
.write("echo foo\n")
310 exe
= _cephadm
.find_executable(foo_sh
)
311 assert str(exe
) == str(foo_sh
)
313 def test_no_path(self
, monkeypatch
):
314 monkeypatch
.delenv("PATH")
315 exe
= _cephadm
.find_executable("true")
316 assert exe
.endswith("true")
318 def test_no_path_no_confstr(self
, monkeypatch
):
320 raise ValueError("fail")
322 monkeypatch
.delenv("PATH")
323 monkeypatch
.setattr("os.confstr", _fail
)
324 exe
= _cephadm
.find_executable("true")
325 assert exe
.endswith("true")
327 def test_unset_path(self
):
328 exe
= _cephadm
.find_executable("true", path
="")
331 def test_no_such_exe(self
):
332 exe
= _cephadm
.find_executable("foo_bar-baz.noway")
336 def test_find_program():
337 exe
= _cephadm
.find_program("true")
338 assert exe
.endswith("true")
340 with pytest
.raises(ValueError):
341 _cephadm
.find_program("foo_bar-baz.noway")
344 def _mk_fake_call(enabled
, active
):
345 def _fake_call(ctx
, cmd
, **kwargs
):
346 if "is-enabled" in cmd
:
347 if isinstance(enabled
, Exception):
350 if "is-active" in cmd
:
351 if isinstance(active
, Exception):
354 raise ValueError("should not get here")
359 @pytest.mark
.parametrize(
360 "enabled_out, active_out, expected",
366 (True, "running", True),
369 # disabled, unknown if active
372 (False, "unknown", True),
375 # is-enabled error (not disabled, unknown if active
378 (False, "unknown", False),
381 # is-enabled ok, inactive is stopped
384 (True, "stopped", True),
387 # is-enabled ok, failed is error
390 (True, "error", True),
393 # is-enabled ok, auto-restart is error
395 ("auto-restart", "", 0),
396 (True, "error", True),
399 # error exec'ing is-enabled cmd
402 (False, "running", False),
405 # error exec'ing is-enabled cmd
408 (True, "unknown", True),
412 def test_check_unit(enabled_out
, active_out
, expected
):
413 with
with_cephadm_ctx([]) as ctx
:
414 _cephadm
.call
.side_effect
= _mk_fake_call(
418 enabled
, state
, installed
= _cephadm
.check_unit(ctx
, "foobar")
419 assert (enabled
, state
, installed
) == expected
423 def __init__(self
, should_be_called
):
424 self
._should
_be
_called
= should_be_called
427 def enable_service(self
, service
):
428 self
._services
.append(service
)
430 def check_expected(self
):
431 if not self
._should
_be
_called
:
432 assert not self
._services
434 # there are currently seven chron/chrony type services that
435 # cephadm looks for. Make sure it probed for each of them
436 # or more in case someone adds to the list.
437 assert len(self
._services
) >= 7
438 assert "chrony.service" in self
._services
439 assert "ntp.service" in self
._services
442 @pytest.mark
.parametrize(
443 "call_fn, enabler, expected",
445 # Test that time sync services are not enabled
454 # Test that time sync service is enabled
458 active
=("active", "", 0),
463 # Test that time sync is not enabled, and try to enable them.
464 # This one needs to be not running, but installed in order to
465 # call the enabler. It should call the enabler with every known
469 enabled
=("disabled", "", 1),
475 # Test that time sync is enabled, with an enabler passed which
476 # will check that the enabler was never called.
480 active
=("active", "", 0),
487 def test_check_time_sync(call_fn
, enabler
, expected
):
488 """The check_time_sync call actually checks if a time synchronization service
489 is enabled. It is also the only consumer of check_units.
491 with
with_cephadm_ctx([]) as ctx
:
492 _cephadm
.call
.side_effect
= call_fn
493 result
= _cephadm
.check_time_sync(ctx
, enabler
=enabler
)
494 assert result
== expected
495 if enabler
is not None:
496 enabler
.check_expected()
499 @pytest.mark
.parametrize(
509 """# A sample from a real centos system
513 ID_LIKE="rhel fedora"
515 PLATFORM_ID="platform:el8"
516 PRETTY_NAME="CentOS Stream 8"
518 CPE_NAME="cpe:/o:centos:centos:8"
519 HOME_URL="https://centos.org/"
520 BUG_REPORT_URL="https://bugzilla.redhat.com/"
521 REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8"
522 REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream"
524 ("centos", "8", None),
527 """# Minimal but complete, made up vals
530 VERSION_CODENAME="hpec nimda"
532 ("hpec", "33", "hpec nimda"),
535 """# Minimal but complete, no quotes
538 VERSION_CODENAME=hpec nimda
540 ("hpec", "33", "hpec nimda"),
544 def test_get_distro(monkeypatch
, content
, expected
):
545 def _fake_open(*args
, **kwargs
):
546 return io
.StringIO(content
)
548 monkeypatch
.setattr("builtins.open", _fake_open
)
549 assert _cephadm
.get_distro() == expected
553 """FakeContext is a minimal type for passing as a ctx, when
554 with_cephadm_ctx is not appropriate (it enables too many mocks, etc).
560 def _has_non_zero_exit(clog
):
561 assert any("Non-zero exit" in ll
for _
, _
, ll
in clog
.record_tuples
)
564 def _has_values_somewhere(clog
, values
, non_zero
=True):
566 _has_non_zero_exit(clog
)
568 assert any(value
in ll
for _
, _
, ll
in clog
.record_tuples
)
571 @pytest.mark
.parametrize(
572 "pyline, expected, call_kwargs, log_check",
575 "import time; time.sleep(0.1)",
582 "import sys; sys.exit(2)",
589 "import sys; sys.exit(0)",
593 id="success-with-desc",
596 "print('foo'); print('bar')",
597 ("foo\nbar\n", "", 0),
603 "import sys; sys.stderr.write('la\\nla\\nla\\n')",
604 ("", "la\nla\nla\n", 0),
610 "for i in range(501): print(i, flush=True)",
611 lambda r
: r
[2] == 0 and r
[1] == "" and "500" in r
[0].splitlines(),
617 "for i in range(1000000): print(i, flush=True)",
620 and len(r
[0].splitlines()) == 1000000,
623 id="stdout-very-long",
626 "import sys; sys.stderr.write('pow\\noof\\nouch\\n'); sys.exit(1)",
627 ("", "pow\noof\nouch\n", 1),
630 _has_values_somewhere
,
631 values
=["pow", "oof", "ouch"],
634 id="stderr-logged-non-zero",
637 "import time; time.sleep(4)",
644 "import time\nfor i in range(100):\n\tprint(i, flush=True); time.sleep(0.01)",
648 id="slow-print-timeout",
650 # Commands that time out collect no logs, return empty std{out,err} strings
653 def test_call(caplog
, monkeypatch
, pyline
, expected
, call_kwargs
, log_check
):
656 caplog
.set_level(logging
.INFO
)
657 monkeypatch
.setattr("cephadm.logger", logging
.getLogger())
659 result
= _cephadm
.call(ctx
, [sys
.executable
, "-c", pyline
], **call_kwargs
)
660 if callable(expected
):
661 assert expected(result
)
663 assert result
== expected
664 if callable(log_check
):
669 def test_success(self
, tmp_path
):
670 "Test the simple basic feature of writing a file."
671 dest
= tmp_path
/ "foo.txt"
672 with _cephadm
.write_new(dest
) as fh
:
673 fh
.write("something\n")
674 fh
.write("something else\n")
676 with
open(dest
, "r") as fh
:
677 assert fh
.read() == "something\nsomething else\n"
679 def test_write_ower_mode(self
, tmp_path
):
680 "Test that the owner and perms options function."
681 dest
= tmp_path
/ "foo.txt"
683 # if this is test run as non-root, we can't really change ownership
687 with _cephadm
.write_new(dest
, owner
=(uid
, gid
), perms
=0o600) as fh
:
688 fh
.write("xomething\n")
689 fh
.write("xomething else\n")
691 with
open(dest
, "r") as fh
:
692 assert fh
.read() == "xomething\nxomething else\n"
693 sr
= os
.fstat(fh
.fileno())
694 assert sr
.st_uid
== uid
695 assert sr
.st_gid
== gid
696 assert (sr
.st_mode
& 0o777) == 0o600
698 def test_encoding(self
, tmp_path
):
699 "Test that the encoding option functions."
700 dest
= tmp_path
/ "foo.txt"
701 msg
= "\u2603\u26C5\n"
702 with _cephadm
.write_new(dest
, encoding
='utf-8') as fh
:
704 with
open(dest
, "rb") as fh
:
706 assert b1
.decode('utf-8') == msg
708 dest
= tmp_path
/ "foo2.txt"
709 with _cephadm
.write_new(dest
, encoding
='utf-16le') as fh
:
711 with
open(dest
, "rb") as fh
:
713 assert b2
.decode('utf-16le') == msg
715 # the binary data should differ due to the different encodings
718 def test_cleanup(self
, tmp_path
):
719 "Test that an exception during write leaves no file behind."
720 dest
= tmp_path
/ "foo.txt"
721 with pytest
.raises(ValueError):
722 with _cephadm
.write_new(dest
) as fh
:
724 raise ValueError("foo")
726 assert not dest
.exists()
727 assert not dest
.with_name(dest
.name
+".new").exists()
728 assert list(dest
.parent
.iterdir()) == []
731 class CompareContext1
:
735 "image": "fake.io/noway/nohow:gndn",
738 "vegetable": "carrot",
741 "osd_fsid": "robble",
742 "tcp_ports": [404, 9999],
745 "alpha": {"sloop": "John B"},
746 "beta": {"forest": "birch"},
747 "gamma": {"forest": "pine"},
751 def check(self
, ctx
):
752 assert ctx
.name
== 'mane'
753 assert ctx
.fsid
== 'foobar'
754 assert ctx
.image
== 'fake.io/noway/nohow:gndn'
755 assert ctx
.meta_properties
== {"fruit": "banana", "vegetable": "carrot"}
756 assert ctx
.config_blobs
== {
757 "alpha": {"sloop": "John B"},
758 "beta": {"forest": "birch"},
759 "gamma": {"forest": "pine"},
761 assert ctx
.osd_fsid
== "robble"
762 assert ctx
.tcp_ports
== [404, 9999]
765 class CompareContext2
:
771 "vegetable": "carrot",
775 "alpha": {"sloop": "John B"},
776 "beta": {"forest": "birch"},
777 "gamma": {"forest": "pine"},
781 def check(self
, ctx
):
782 assert ctx
.name
== 'cc2'
783 assert ctx
.fsid
== 'foobar'
784 assert ctx
.image
== 'quay.io/ceph/ceph:v18'
785 assert ctx
.meta_properties
== {"fruit": "banana", "vegetable": "carrot"}
786 assert ctx
.config_blobs
== {
787 "alpha": {"sloop": "John B"},
788 "beta": {"forest": "birch"},
789 "gamma": {"forest": "pine"},
791 assert ctx
.osd_fsid
is None
792 assert ctx
.tcp_ports
is None
795 @pytest.mark
.parametrize(
802 def test_apply_deploy_config_to_ctx(cc
, monkeypatch
):
805 monkeypatch
.setattr("cephadm.logger", logging
.getLogger())
807 _cephadm
.apply_deploy_config_to_ctx(cc
.cfg_data
, ctx
)