]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | # Tests for various assorted utility functions found within cephadm |
2 | # | |
3 | from unittest import mock | |
4 | ||
5 | import functools | |
6 | import io | |
7 | import os | |
8 | import sys | |
9 | ||
10 | import pytest | |
11 | ||
12 | from tests.fixtures import with_cephadm_ctx, import_cephadm | |
13 | ||
14 | _cephadm = import_cephadm() | |
15 | ||
16 | ||
17 | class TestCopyTree: | |
18 | def _copy_tree(self, *args, **kwargs): | |
19 | with with_cephadm_ctx([]) as ctx: | |
20 | with mock.patch("cephadm.extract_uid_gid") as eug: | |
21 | eug.return_value = (os.getuid(), os.getgid()) | |
22 | _cephadm.copy_tree(ctx, *args, **kwargs) | |
23 | ||
24 | def test_one_dir(self, tmp_path): | |
25 | """Copy one dir into a non-existing dest dir.""" | |
26 | src1 = tmp_path / "src1" | |
27 | dst = tmp_path / "dst" | |
28 | src1.mkdir(parents=True) | |
29 | ||
30 | with (src1 / "foo.txt").open("w") as fh: | |
31 | fh.write("hello\n") | |
32 | fh.write("earth\n") | |
33 | ||
34 | assert not (dst / "foo.txt").exists() | |
35 | ||
36 | self._copy_tree([src1], dst) | |
37 | assert (dst / "foo.txt").exists() | |
38 | ||
39 | def test_one_existing_dir(self, tmp_path): | |
40 | """Copy one dir into an existing dest dir.""" | |
41 | src1 = tmp_path / "src1" | |
42 | dst = tmp_path / "dst" | |
43 | src1.mkdir(parents=True) | |
44 | dst.mkdir(parents=True) | |
45 | ||
46 | with (src1 / "foo.txt").open("w") as fh: | |
47 | fh.write("hello\n") | |
48 | fh.write("earth\n") | |
49 | ||
50 | assert not (dst / "src1").exists() | |
51 | ||
52 | self._copy_tree([src1], dst) | |
53 | assert (dst / "src1/foo.txt").exists() | |
54 | ||
55 | def test_two_dirs(self, tmp_path): | |
56 | """Copy two source directories into an existing dest dir.""" | |
57 | src1 = tmp_path / "src1" | |
58 | src2 = tmp_path / "src2" | |
59 | dst = tmp_path / "dst" | |
60 | src1.mkdir(parents=True) | |
61 | src2.mkdir(parents=True) | |
62 | dst.mkdir(parents=True) | |
63 | ||
64 | with (src1 / "foo.txt").open("w") as fh: | |
65 | fh.write("hello\n") | |
66 | fh.write("earth\n") | |
67 | with (src2 / "bar.txt").open("w") as fh: | |
68 | fh.write("goodbye\n") | |
69 | fh.write("mars\n") | |
70 | ||
71 | assert not (dst / "src1").exists() | |
72 | assert not (dst / "src2").exists() | |
73 | ||
74 | self._copy_tree([src1, src2], dst) | |
75 | assert (dst / "src1/foo.txt").exists() | |
76 | assert (dst / "src2/bar.txt").exists() | |
77 | ||
78 | def test_one_dir_set_uid(self, tmp_path): | |
79 | """Explicity pass uid/gid values and assert these are passed to chown.""" | |
80 | # Because this test will often be run by non-root users it is necessary | |
81 | # to mock os.chown or we too easily run into perms issues. | |
82 | src1 = tmp_path / "src1" | |
83 | dst = tmp_path / "dst" | |
84 | src1.mkdir(parents=True) | |
85 | ||
86 | with (src1 / "foo.txt").open("w") as fh: | |
87 | fh.write("hello\n") | |
88 | fh.write("earth\n") | |
89 | ||
90 | assert not (dst / "foo.txt").exists() | |
91 | ||
92 | with mock.patch("os.chown") as _chown: | |
93 | _chown.return_value = None | |
94 | self._copy_tree([src1], dst, uid=0, gid=0) | |
95 | assert len(_chown.mock_calls) >= 2 | |
96 | for c in _chown.mock_calls: | |
aee94f69 | 97 | assert c == mock.call(mock.ANY, 0, 0) |
1e59de90 TL |
98 | assert (dst / "foo.txt").exists() |
99 | ||
100 | ||
101 | class TestCopyFiles: | |
102 | def _copy_files(self, *args, **kwargs): | |
103 | with with_cephadm_ctx([]) as ctx: | |
104 | with mock.patch("cephadm.extract_uid_gid") as eug: | |
105 | eug.return_value = (os.getuid(), os.getgid()) | |
106 | _cephadm.copy_files(ctx, *args, **kwargs) | |
107 | ||
108 | def test_one_file(self, tmp_path): | |
109 | """Copy one file into the dest dir.""" | |
110 | file1 = tmp_path / "f1.txt" | |
111 | dst = tmp_path / "dst" | |
112 | dst.mkdir(parents=True) | |
113 | ||
114 | with file1.open("w") as fh: | |
115 | fh.write("its test time\n") | |
116 | ||
117 | self._copy_files([file1], dst) | |
118 | assert (dst / "f1.txt").exists() | |
119 | ||
120 | def test_one_file_nodest(self, tmp_path): | |
121 | """Copy one file to the given destination path.""" | |
122 | file1 = tmp_path / "f1.txt" | |
123 | dst = tmp_path / "dst" | |
124 | ||
125 | with file1.open("w") as fh: | |
126 | fh.write("its test time\n") | |
127 | ||
128 | self._copy_files([file1], dst) | |
129 | assert not dst.is_dir() | |
130 | assert dst.is_file() | |
131 | assert dst.open("r").read() == "its test time\n" | |
132 | ||
133 | def test_three_files(self, tmp_path): | |
134 | """Copy one file into the dest dir.""" | |
135 | file1 = tmp_path / "f1.txt" | |
136 | file2 = tmp_path / "f2.txt" | |
137 | file3 = tmp_path / "f3.txt" | |
138 | dst = tmp_path / "dst" | |
139 | dst.mkdir(parents=True) | |
140 | ||
141 | with file1.open("w") as fh: | |
142 | fh.write("its test time\n") | |
143 | with file2.open("w") as fh: | |
144 | fh.write("f2\n") | |
145 | with file3.open("w") as fh: | |
146 | fh.write("f3\n") | |
147 | ||
148 | self._copy_files([file1, file2, file3], dst) | |
149 | assert (dst / "f1.txt").exists() | |
150 | assert (dst / "f2.txt").exists() | |
151 | assert (dst / "f3.txt").exists() | |
152 | ||
153 | def test_three_files_nodest(self, tmp_path): | |
154 | """Copy files to dest path (not a dir). This is not a useful operation.""" | |
155 | file1 = tmp_path / "f1.txt" | |
156 | file2 = tmp_path / "f2.txt" | |
157 | file3 = tmp_path / "f3.txt" | |
158 | dst = tmp_path / "dst" | |
159 | ||
160 | with file1.open("w") as fh: | |
161 | fh.write("its test time\n") | |
162 | with file2.open("w") as fh: | |
163 | fh.write("f2\n") | |
164 | with file3.open("w") as fh: | |
165 | fh.write("f3\n") | |
166 | ||
167 | self._copy_files([file1, file2, file3], dst) | |
168 | assert not dst.is_dir() | |
169 | assert dst.is_file() | |
170 | assert dst.open("r").read() == "f3\n" | |
171 | ||
172 | def test_one_file_set_uid(self, tmp_path): | |
173 | """Explicity pass uid/gid values and assert these are passed to chown.""" | |
174 | # Because this test will often be run by non-root users it is necessary | |
175 | # to mock os.chown or we too easily run into perms issues. | |
176 | file1 = tmp_path / "f1.txt" | |
177 | dst = tmp_path / "dst" | |
178 | dst.mkdir(parents=True) | |
179 | ||
180 | with file1.open("w") as fh: | |
181 | fh.write("its test time\n") | |
182 | ||
183 | assert not (dst / "f1.txt").exists() | |
184 | ||
185 | with mock.patch("os.chown") as _chown: | |
186 | _chown.return_value = None | |
187 | self._copy_files([file1], dst, uid=0, gid=0) | |
188 | assert len(_chown.mock_calls) >= 1 | |
189 | for c in _chown.mock_calls: | |
aee94f69 | 190 | assert c == mock.call(mock.ANY, 0, 0) |
1e59de90 TL |
191 | assert (dst / "f1.txt").exists() |
192 | ||
193 | ||
194 | class TestMoveFiles: | |
195 | def _move_files(self, *args, **kwargs): | |
196 | with with_cephadm_ctx([]) as ctx: | |
197 | with mock.patch("cephadm.extract_uid_gid") as eug: | |
198 | eug.return_value = (os.getuid(), os.getgid()) | |
199 | _cephadm.move_files(ctx, *args, **kwargs) | |
200 | ||
201 | def test_one_file(self, tmp_path): | |
202 | """Move a named file to test dest path.""" | |
203 | file1 = tmp_path / "f1.txt" | |
204 | dst = tmp_path / "dst" | |
205 | ||
206 | with file1.open("w") as fh: | |
207 | fh.write("lets moove\n") | |
208 | ||
209 | assert not dst.exists() | |
210 | assert file1.is_file() | |
211 | ||
212 | self._move_files([file1], dst) | |
213 | assert dst.is_file() | |
214 | assert not file1.exists() | |
215 | ||
216 | def test_one_file_destdir(self, tmp_path): | |
217 | """Move a file into an existing dest dir.""" | |
218 | file1 = tmp_path / "f1.txt" | |
219 | dst = tmp_path / "dst" | |
220 | dst.mkdir(parents=True) | |
221 | ||
222 | with file1.open("w") as fh: | |
223 | fh.write("lets moove\n") | |
224 | ||
225 | assert not (dst / "f1.txt").exists() | |
226 | assert file1.is_file() | |
227 | ||
228 | self._move_files([file1], dst) | |
229 | assert (dst / "f1.txt").is_file() | |
230 | assert not file1.exists() | |
231 | ||
232 | def test_one_file_one_link(self, tmp_path): | |
233 | """Move a file and a symlink to that file to a dest dir.""" | |
234 | file1 = tmp_path / "f1.txt" | |
235 | link1 = tmp_path / "lnk" | |
236 | dst = tmp_path / "dst" | |
237 | dst.mkdir(parents=True) | |
238 | ||
239 | with file1.open("w") as fh: | |
240 | fh.write("lets moove\n") | |
241 | os.symlink("f1.txt", link1) | |
242 | ||
243 | assert not (dst / "f1.txt").exists() | |
244 | assert file1.is_file() | |
245 | assert link1.exists() | |
246 | ||
247 | self._move_files([file1, link1], dst) | |
248 | assert (dst / "f1.txt").is_file() | |
249 | assert (dst / "lnk").is_symlink() | |
250 | assert not file1.exists() | |
251 | assert not link1.exists() | |
252 | assert (dst / "f1.txt").open("r").read() == "lets moove\n" | |
253 | assert (dst / "lnk").open("r").read() == "lets moove\n" | |
254 | ||
255 | def test_one_file_set_uid(self, tmp_path): | |
256 | """Explicity pass uid/gid values and assert these are passed to chown.""" | |
257 | # Because this test will often be run by non-root users it is necessary | |
258 | # to mock os.chown or we too easily run into perms issues. | |
259 | file1 = tmp_path / "f1.txt" | |
260 | dst = tmp_path / "dst" | |
261 | ||
262 | with file1.open("w") as fh: | |
263 | fh.write("lets moove\n") | |
264 | ||
265 | assert not dst.exists() | |
266 | assert file1.is_file() | |
267 | ||
268 | with mock.patch("os.chown") as _chown: | |
269 | _chown.return_value = None | |
270 | self._move_files([file1], dst, uid=0, gid=0) | |
271 | assert len(_chown.mock_calls) >= 1 | |
272 | for c in _chown.mock_calls: | |
aee94f69 | 273 | assert c == mock.call(mock.ANY, 0, 0) |
1e59de90 TL |
274 | assert dst.is_file() |
275 | assert not file1.exists() | |
276 | ||
277 | ||
278 | def test_recursive_chown(tmp_path): | |
279 | d1 = tmp_path / "dir1" | |
280 | d2 = d1 / "dir2" | |
281 | f1 = d2 / "file1.txt" | |
282 | d2.mkdir(parents=True) | |
283 | ||
284 | with f1.open("w") as fh: | |
285 | fh.write("low down\n") | |
286 | ||
287 | with mock.patch("os.chown") as _chown: | |
288 | _chown.return_value = None | |
289 | _cephadm.recursive_chown(str(d1), uid=500, gid=500) | |
290 | assert len(_chown.mock_calls) == 3 | |
aee94f69 TL |
291 | assert _chown.mock_calls[0] == mock.call(str(d1), 500, 500) |
292 | assert _chown.mock_calls[1] == mock.call(str(d2), 500, 500) | |
293 | assert _chown.mock_calls[2] == mock.call(str(f1), 500, 500) | |
1e59de90 TL |
294 | |
295 | ||
296 | class TestFindExecutable: | |
297 | def test_standard_exe(self): | |
298 | # pretty much every system will have `true` on the path. It's a safe choice | |
299 | # for the first assertion | |
300 | exe = _cephadm.find_executable("true") | |
301 | assert exe.endswith("true") | |
302 | ||
303 | def test_custom_path(self, tmp_path): | |
304 | foo_sh = tmp_path / "foo.sh" | |
305 | with open(foo_sh, "w") as fh: | |
306 | fh.write("#!/bin/sh\n") | |
307 | fh.write("echo foo\n") | |
308 | foo_sh.chmod(0o755) | |
309 | ||
310 | exe = _cephadm.find_executable(foo_sh) | |
311 | assert str(exe) == str(foo_sh) | |
312 | ||
313 | def test_no_path(self, monkeypatch): | |
314 | monkeypatch.delenv("PATH") | |
315 | exe = _cephadm.find_executable("true") | |
316 | assert exe.endswith("true") | |
317 | ||
318 | def test_no_path_no_confstr(self, monkeypatch): | |
319 | def _fail(_): | |
320 | raise ValueError("fail") | |
321 | ||
322 | monkeypatch.delenv("PATH") | |
323 | monkeypatch.setattr("os.confstr", _fail) | |
324 | exe = _cephadm.find_executable("true") | |
325 | assert exe.endswith("true") | |
326 | ||
327 | def test_unset_path(self): | |
328 | exe = _cephadm.find_executable("true", path="") | |
329 | assert exe is None | |
330 | ||
331 | def test_no_such_exe(self): | |
332 | exe = _cephadm.find_executable("foo_bar-baz.noway") | |
333 | assert exe is None | |
334 | ||
335 | ||
336 | def test_find_program(): | |
337 | exe = _cephadm.find_program("true") | |
338 | assert exe.endswith("true") | |
339 | ||
340 | with pytest.raises(ValueError): | |
341 | _cephadm.find_program("foo_bar-baz.noway") | |
342 | ||
343 | ||
344 | def _mk_fake_call(enabled, active): | |
345 | def _fake_call(ctx, cmd, **kwargs): | |
346 | if "is-enabled" in cmd: | |
347 | if isinstance(enabled, Exception): | |
348 | raise enabled | |
349 | return enabled | |
350 | if "is-active" in cmd: | |
351 | if isinstance(active, Exception): | |
352 | raise active | |
353 | return active | |
354 | raise ValueError("should not get here") | |
355 | ||
356 | return _fake_call | |
357 | ||
358 | ||
359 | @pytest.mark.parametrize( | |
360 | "enabled_out, active_out, expected", | |
361 | [ | |
362 | ( | |
363 | # ok, all is well | |
364 | ("", "", 0), | |
365 | ("active", "", 0), | |
366 | (True, "running", True), | |
367 | ), | |
368 | ( | |
369 | # disabled, unknown if active | |
370 | ("disabled", "", 1), | |
371 | ("", "", 0), | |
372 | (False, "unknown", True), | |
373 | ), | |
374 | ( | |
375 | # is-enabled error (not disabled, unknown if active | |
376 | ("bleh", "", 1), | |
377 | ("", "", 0), | |
378 | (False, "unknown", False), | |
379 | ), | |
380 | ( | |
381 | # is-enabled ok, inactive is stopped | |
382 | ("", "", 0), | |
383 | ("inactive", "", 0), | |
384 | (True, "stopped", True), | |
385 | ), | |
386 | ( | |
387 | # is-enabled ok, failed is error | |
388 | ("", "", 0), | |
389 | ("failed", "", 0), | |
390 | (True, "error", True), | |
391 | ), | |
392 | ( | |
393 | # is-enabled ok, auto-restart is error | |
394 | ("", "", 0), | |
395 | ("auto-restart", "", 0), | |
396 | (True, "error", True), | |
397 | ), | |
398 | ( | |
399 | # error exec'ing is-enabled cmd | |
400 | ValueError("bonk"), | |
401 | ("active", "", 0), | |
402 | (False, "running", False), | |
403 | ), | |
404 | ( | |
405 | # error exec'ing is-enabled cmd | |
406 | ("", "", 0), | |
407 | ValueError("blat"), | |
408 | (True, "unknown", True), | |
409 | ), | |
410 | ], | |
411 | ) | |
412 | def test_check_unit(enabled_out, active_out, expected): | |
413 | with with_cephadm_ctx([]) as ctx: | |
414 | _cephadm.call.side_effect = _mk_fake_call( | |
415 | enabled=enabled_out, | |
416 | active=active_out, | |
417 | ) | |
418 | enabled, state, installed = _cephadm.check_unit(ctx, "foobar") | |
419 | assert (enabled, state, installed) == expected | |
420 | ||
421 | ||
422 | class FakeEnabler: | |
423 | def __init__(self, should_be_called): | |
424 | self._should_be_called = should_be_called | |
425 | self._services = [] | |
426 | ||
427 | def enable_service(self, service): | |
428 | self._services.append(service) | |
429 | ||
430 | def check_expected(self): | |
431 | if not self._should_be_called: | |
432 | assert not self._services | |
433 | return | |
434 | # there are currently seven chron/chrony type services that | |
435 | # cephadm looks for. Make sure it probed for each of them | |
436 | # or more in case someone adds to the list. | |
437 | assert len(self._services) >= 7 | |
438 | assert "chrony.service" in self._services | |
439 | assert "ntp.service" in self._services | |
440 | ||
441 | ||
442 | @pytest.mark.parametrize( | |
443 | "call_fn, enabler, expected", | |
444 | [ | |
445 | # Test that time sync services are not enabled | |
446 | ( | |
447 | _mk_fake_call( | |
448 | enabled=("", "", 1), | |
449 | active=("", "", 1), | |
450 | ), | |
451 | None, | |
452 | False, | |
453 | ), | |
454 | # Test that time sync service is enabled | |
455 | ( | |
456 | _mk_fake_call( | |
457 | enabled=("", "", 0), | |
458 | active=("active", "", 0), | |
459 | ), | |
460 | None, | |
461 | True, | |
462 | ), | |
463 | # Test that time sync is not enabled, and try to enable them. | |
464 | # This one needs to be not running, but installed in order to | |
465 | # call the enabler. It should call the enabler with every known | |
466 | # service name. | |
467 | ( | |
468 | _mk_fake_call( | |
469 | enabled=("disabled", "", 1), | |
470 | active=("", "", 1), | |
471 | ), | |
472 | FakeEnabler(True), | |
473 | False, | |
474 | ), | |
475 | # Test that time sync is enabled, with an enabler passed which | |
476 | # will check that the enabler was never called. | |
477 | ( | |
478 | _mk_fake_call( | |
479 | enabled=("", "", 0), | |
480 | active=("active", "", 0), | |
481 | ), | |
482 | FakeEnabler(False), | |
483 | True, | |
484 | ), | |
485 | ], | |
486 | ) | |
487 | def test_check_time_sync(call_fn, enabler, expected): | |
488 | """The check_time_sync call actually checks if a time synchronization service | |
489 | is enabled. It is also the only consumer of check_units. | |
490 | """ | |
491 | with with_cephadm_ctx([]) as ctx: | |
492 | _cephadm.call.side_effect = call_fn | |
493 | result = _cephadm.check_time_sync(ctx, enabler=enabler) | |
494 | assert result == expected | |
495 | if enabler is not None: | |
496 | enabler.check_expected() | |
497 | ||
498 | ||
499 | @pytest.mark.parametrize( | |
500 | "content, expected", | |
501 | [ | |
502 | ( | |
503 | """#JUNK | |
504 | FOO=1 | |
505 | """, | |
506 | (None, None, None), | |
507 | ), | |
508 | ( | |
509 | """# A sample from a real centos system | |
510 | NAME="CentOS Stream" | |
511 | VERSION="8" | |
512 | ID="centos" | |
513 | ID_LIKE="rhel fedora" | |
514 | VERSION_ID="8" | |
515 | PLATFORM_ID="platform:el8" | |
516 | PRETTY_NAME="CentOS Stream 8" | |
517 | ANSI_COLOR="0;31" | |
518 | CPE_NAME="cpe:/o:centos:centos:8" | |
519 | HOME_URL="https://centos.org/" | |
520 | BUG_REPORT_URL="https://bugzilla.redhat.com/" | |
521 | REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8" | |
522 | REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream" | |
523 | """, | |
524 | ("centos", "8", None), | |
525 | ), | |
526 | ( | |
527 | """# Minimal but complete, made up vals | |
528 | ID="hpec" | |
529 | VERSION_ID="33" | |
530 | VERSION_CODENAME="hpec nimda" | |
531 | """, | |
532 | ("hpec", "33", "hpec nimda"), | |
533 | ), | |
534 | ( | |
535 | """# Minimal but complete, no quotes | |
536 | ID=hpec | |
537 | VERSION_ID=33 | |
538 | VERSION_CODENAME=hpec nimda | |
539 | """, | |
540 | ("hpec", "33", "hpec nimda"), | |
541 | ), | |
542 | ], | |
543 | ) | |
544 | def test_get_distro(monkeypatch, content, expected): | |
545 | def _fake_open(*args, **kwargs): | |
546 | return io.StringIO(content) | |
547 | ||
548 | monkeypatch.setattr("builtins.open", _fake_open) | |
549 | assert _cephadm.get_distro() == expected | |
550 | ||
551 | ||
552 | class FakeContext: | |
553 | """FakeContext is a minimal type for passing as a ctx, when | |
554 | with_cephadm_ctx is not appropriate (it enables too many mocks, etc). | |
555 | """ | |
556 | ||
557 | timeout = 30 | |
558 | ||
559 | ||
560 | def _has_non_zero_exit(clog): | |
561 | assert any("Non-zero exit" in ll for _, _, ll in clog.record_tuples) | |
562 | ||
563 | ||
564 | def _has_values_somewhere(clog, values, non_zero=True): | |
565 | if non_zero: | |
566 | _has_non_zero_exit(clog) | |
567 | for value in values: | |
568 | assert any(value in ll for _, _, ll in clog.record_tuples) | |
569 | ||
570 | ||
571 | @pytest.mark.parametrize( | |
572 | "pyline, expected, call_kwargs, log_check", | |
573 | [ | |
574 | pytest.param( | |
575 | "import time; time.sleep(0.1)", | |
576 | ("", "", 0), | |
577 | {}, | |
578 | None, | |
579 | id="brief-sleep", | |
580 | ), | |
581 | pytest.param( | |
582 | "import sys; sys.exit(2)", | |
583 | ("", "", 2), | |
584 | {}, | |
585 | _has_non_zero_exit, | |
586 | id="exit-non-zero", | |
587 | ), | |
588 | pytest.param( | |
589 | "import sys; sys.exit(0)", | |
590 | ("", "", 0), | |
591 | {"desc": "success"}, | |
592 | None, | |
593 | id="success-with-desc", | |
594 | ), | |
595 | pytest.param( | |
596 | "print('foo'); print('bar')", | |
597 | ("foo\nbar\n", "", 0), | |
598 | {"desc": "stdout"}, | |
599 | None, | |
600 | id="stdout-print", | |
601 | ), | |
602 | pytest.param( | |
603 | "import sys; sys.stderr.write('la\\nla\\nla\\n')", | |
604 | ("", "la\nla\nla\n", 0), | |
605 | {"desc": "stderr"}, | |
606 | None, | |
607 | id="stderr-print", | |
608 | ), | |
609 | pytest.param( | |
610 | "for i in range(501): print(i, flush=True)", | |
611 | lambda r: r[2] == 0 and r[1] == "" and "500" in r[0].splitlines(), | |
612 | {}, | |
613 | None, | |
614 | id="stdout-long", | |
615 | ), | |
616 | pytest.param( | |
617 | "for i in range(1000000): print(i, flush=True)", | |
618 | lambda r: r[2] == 0 | |
619 | and r[1] == "" | |
620 | and len(r[0].splitlines()) == 1000000, | |
621 | {}, | |
622 | None, | |
623 | id="stdout-very-long", | |
624 | ), | |
625 | pytest.param( | |
626 | "import sys; sys.stderr.write('pow\\noof\\nouch\\n'); sys.exit(1)", | |
627 | ("", "pow\noof\nouch\n", 1), | |
628 | {"desc": "stderr"}, | |
629 | functools.partial( | |
630 | _has_values_somewhere, | |
631 | values=["pow", "oof", "ouch"], | |
632 | non_zero=True, | |
633 | ), | |
634 | id="stderr-logged-non-zero", | |
635 | ), | |
636 | pytest.param( | |
637 | "import time; time.sleep(4)", | |
638 | ("", "", 124), | |
639 | {"timeout": 1}, | |
640 | None, | |
641 | id="long-sleep", | |
642 | ), | |
643 | pytest.param( | |
644 | "import time\nfor i in range(100):\n\tprint(i, flush=True); time.sleep(0.01)", | |
645 | ("", "", 124), | |
646 | {"timeout": 0.5}, | |
647 | None, | |
648 | id="slow-print-timeout", | |
649 | ), | |
650 | # Commands that time out collect no logs, return empty std{out,err} strings | |
651 | ], | |
652 | ) | |
653 | def test_call(caplog, monkeypatch, pyline, expected, call_kwargs, log_check): | |
654 | import logging | |
655 | ||
656 | caplog.set_level(logging.INFO) | |
657 | monkeypatch.setattr("cephadm.logger", logging.getLogger()) | |
658 | ctx = FakeContext() | |
659 | result = _cephadm.call(ctx, [sys.executable, "-c", pyline], **call_kwargs) | |
660 | if callable(expected): | |
661 | assert expected(result) | |
662 | else: | |
663 | assert result == expected | |
664 | if callable(log_check): | |
665 | log_check(caplog) | |
aee94f69 TL |
666 | |
667 | ||
668 | class TestWriteNew: | |
669 | def test_success(self, tmp_path): | |
670 | "Test the simple basic feature of writing a file." | |
671 | dest = tmp_path / "foo.txt" | |
672 | with _cephadm.write_new(dest) as fh: | |
673 | fh.write("something\n") | |
674 | fh.write("something else\n") | |
675 | ||
676 | with open(dest, "r") as fh: | |
677 | assert fh.read() == "something\nsomething else\n" | |
678 | ||
679 | def test_write_ower_mode(self, tmp_path): | |
680 | "Test that the owner and perms options function." | |
681 | dest = tmp_path / "foo.txt" | |
682 | ||
683 | # if this is test run as non-root, we can't really change ownership | |
684 | uid = os.getuid() | |
685 | gid = os.getgid() | |
686 | ||
687 | with _cephadm.write_new(dest, owner=(uid, gid), perms=0o600) as fh: | |
688 | fh.write("xomething\n") | |
689 | fh.write("xomething else\n") | |
690 | ||
691 | with open(dest, "r") as fh: | |
692 | assert fh.read() == "xomething\nxomething else\n" | |
693 | sr = os.fstat(fh.fileno()) | |
694 | assert sr.st_uid == uid | |
695 | assert sr.st_gid == gid | |
696 | assert (sr.st_mode & 0o777) == 0o600 | |
697 | ||
698 | def test_encoding(self, tmp_path): | |
699 | "Test that the encoding option functions." | |
700 | dest = tmp_path / "foo.txt" | |
701 | msg = "\u2603\u26C5\n" | |
702 | with _cephadm.write_new(dest, encoding='utf-8') as fh: | |
703 | fh.write(msg) | |
704 | with open(dest, "rb") as fh: | |
705 | b1 = fh.read() | |
706 | assert b1.decode('utf-8') == msg | |
707 | ||
708 | dest = tmp_path / "foo2.txt" | |
709 | with _cephadm.write_new(dest, encoding='utf-16le') as fh: | |
710 | fh.write(msg) | |
711 | with open(dest, "rb") as fh: | |
712 | b2 = fh.read() | |
713 | assert b2.decode('utf-16le') == msg | |
714 | ||
715 | # the binary data should differ due to the different encodings | |
716 | assert b1 != b2 | |
717 | ||
718 | def test_cleanup(self, tmp_path): | |
719 | "Test that an exception during write leaves no file behind." | |
720 | dest = tmp_path / "foo.txt" | |
721 | with pytest.raises(ValueError): | |
722 | with _cephadm.write_new(dest) as fh: | |
723 | fh.write("hello\n") | |
724 | raise ValueError("foo") | |
725 | fh.write("world\n") | |
726 | assert not dest.exists() | |
727 | assert not dest.with_name(dest.name+".new").exists() | |
728 | assert list(dest.parent.iterdir()) == [] | |
729 | ||
730 | ||
731 | class CompareContext1: | |
732 | cfg_data = { | |
733 | "name": "mane", | |
734 | "fsid": "foobar", | |
735 | "image": "fake.io/noway/nohow:gndn", | |
736 | "meta": { | |
737 | "fruit": "banana", | |
738 | "vegetable": "carrot", | |
739 | }, | |
740 | "params": { | |
741 | "osd_fsid": "robble", | |
742 | "tcp_ports": [404, 9999], | |
743 | }, | |
744 | "config_blobs": { | |
745 | "alpha": {"sloop": "John B"}, | |
746 | "beta": {"forest": "birch"}, | |
747 | "gamma": {"forest": "pine"}, | |
748 | }, | |
749 | } | |
750 | ||
751 | def check(self, ctx): | |
752 | assert ctx.name == 'mane' | |
753 | assert ctx.fsid == 'foobar' | |
754 | assert ctx.image == 'fake.io/noway/nohow:gndn' | |
755 | assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"} | |
756 | assert ctx.config_blobs == { | |
757 | "alpha": {"sloop": "John B"}, | |
758 | "beta": {"forest": "birch"}, | |
759 | "gamma": {"forest": "pine"}, | |
760 | } | |
761 | assert ctx.osd_fsid == "robble" | |
762 | assert ctx.tcp_ports == [404, 9999] | |
763 | ||
764 | ||
765 | class CompareContext2: | |
766 | cfg_data = { | |
767 | "name": "cc2", | |
768 | "fsid": "foobar", | |
769 | "meta": { | |
770 | "fruit": "banana", | |
771 | "vegetable": "carrot", | |
772 | }, | |
773 | "params": {}, | |
774 | "config_blobs": { | |
775 | "alpha": {"sloop": "John B"}, | |
776 | "beta": {"forest": "birch"}, | |
777 | "gamma": {"forest": "pine"}, | |
778 | }, | |
779 | } | |
780 | ||
781 | def check(self, ctx): | |
782 | assert ctx.name == 'cc2' | |
783 | assert ctx.fsid == 'foobar' | |
784 | assert ctx.image == 'quay.io/ceph/ceph:v18' | |
785 | assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"} | |
786 | assert ctx.config_blobs == { | |
787 | "alpha": {"sloop": "John B"}, | |
788 | "beta": {"forest": "birch"}, | |
789 | "gamma": {"forest": "pine"}, | |
790 | } | |
791 | assert ctx.osd_fsid is None | |
792 | assert ctx.tcp_ports is None | |
793 | ||
794 | ||
795 | @pytest.mark.parametrize( | |
796 | "cc", | |
797 | [ | |
798 | CompareContext1(), | |
799 | CompareContext2(), | |
800 | ], | |
801 | ) | |
802 | def test_apply_deploy_config_to_ctx(cc, monkeypatch): | |
803 | import logging | |
804 | ||
805 | monkeypatch.setattr("cephadm.logger", logging.getLogger()) | |
806 | ctx = FakeContext() | |
807 | _cephadm.apply_deploy_config_to_ctx(cc.cfg_data, ctx) | |
808 | cc.check(ctx) |