]> git.proxmox.com Git - ceph.git/blob - ceph/src/cephadm/tests/test_util_funcs.py
270753a554b86e0a2cf65bcb442a95aaebb39974
[ceph.git] / ceph / src / cephadm / tests / test_util_funcs.py
1 # Tests for various assorted utility functions found within cephadm
2 #
3 from unittest import mock
4
5 import functools
6 import io
7 import os
8 import sys
9
10 import pytest
11
12 from tests.fixtures import with_cephadm_ctx, import_cephadm
13
14 _cephadm = import_cephadm()
15
16
17 class TestCopyTree:
18 def _copy_tree(self, *args, **kwargs):
19 with with_cephadm_ctx([]) as ctx:
20 with mock.patch("cephadm.extract_uid_gid") as eug:
21 eug.return_value = (os.getuid(), os.getgid())
22 _cephadm.copy_tree(ctx, *args, **kwargs)
23
24 def test_one_dir(self, tmp_path):
25 """Copy one dir into a non-existing dest dir."""
26 src1 = tmp_path / "src1"
27 dst = tmp_path / "dst"
28 src1.mkdir(parents=True)
29
30 with (src1 / "foo.txt").open("w") as fh:
31 fh.write("hello\n")
32 fh.write("earth\n")
33
34 assert not (dst / "foo.txt").exists()
35
36 self._copy_tree([src1], dst)
37 assert (dst / "foo.txt").exists()
38
39 def test_one_existing_dir(self, tmp_path):
40 """Copy one dir into an existing dest dir."""
41 src1 = tmp_path / "src1"
42 dst = tmp_path / "dst"
43 src1.mkdir(parents=True)
44 dst.mkdir(parents=True)
45
46 with (src1 / "foo.txt").open("w") as fh:
47 fh.write("hello\n")
48 fh.write("earth\n")
49
50 assert not (dst / "src1").exists()
51
52 self._copy_tree([src1], dst)
53 assert (dst / "src1/foo.txt").exists()
54
55 def test_two_dirs(self, tmp_path):
56 """Copy two source directories into an existing dest dir."""
57 src1 = tmp_path / "src1"
58 src2 = tmp_path / "src2"
59 dst = tmp_path / "dst"
60 src1.mkdir(parents=True)
61 src2.mkdir(parents=True)
62 dst.mkdir(parents=True)
63
64 with (src1 / "foo.txt").open("w") as fh:
65 fh.write("hello\n")
66 fh.write("earth\n")
67 with (src2 / "bar.txt").open("w") as fh:
68 fh.write("goodbye\n")
69 fh.write("mars\n")
70
71 assert not (dst / "src1").exists()
72 assert not (dst / "src2").exists()
73
74 self._copy_tree([src1, src2], dst)
75 assert (dst / "src1/foo.txt").exists()
76 assert (dst / "src2/bar.txt").exists()
77
78 def test_one_dir_set_uid(self, tmp_path):
79 """Explicity pass uid/gid values and assert these are passed to chown."""
80 # Because this test will often be run by non-root users it is necessary
81 # to mock os.chown or we too easily run into perms issues.
82 src1 = tmp_path / "src1"
83 dst = tmp_path / "dst"
84 src1.mkdir(parents=True)
85
86 with (src1 / "foo.txt").open("w") as fh:
87 fh.write("hello\n")
88 fh.write("earth\n")
89
90 assert not (dst / "foo.txt").exists()
91
92 with mock.patch("os.chown") as _chown:
93 _chown.return_value = None
94 self._copy_tree([src1], dst, uid=0, gid=0)
95 assert len(_chown.mock_calls) >= 2
96 for c in _chown.mock_calls:
97 assert c == mock.call(mock.ANY, 0, 0)
98 assert (dst / "foo.txt").exists()
99
100
101 class TestCopyFiles:
102 def _copy_files(self, *args, **kwargs):
103 with with_cephadm_ctx([]) as ctx:
104 with mock.patch("cephadm.extract_uid_gid") as eug:
105 eug.return_value = (os.getuid(), os.getgid())
106 _cephadm.copy_files(ctx, *args, **kwargs)
107
108 def test_one_file(self, tmp_path):
109 """Copy one file into the dest dir."""
110 file1 = tmp_path / "f1.txt"
111 dst = tmp_path / "dst"
112 dst.mkdir(parents=True)
113
114 with file1.open("w") as fh:
115 fh.write("its test time\n")
116
117 self._copy_files([file1], dst)
118 assert (dst / "f1.txt").exists()
119
120 def test_one_file_nodest(self, tmp_path):
121 """Copy one file to the given destination path."""
122 file1 = tmp_path / "f1.txt"
123 dst = tmp_path / "dst"
124
125 with file1.open("w") as fh:
126 fh.write("its test time\n")
127
128 self._copy_files([file1], dst)
129 assert not dst.is_dir()
130 assert dst.is_file()
131 assert dst.open("r").read() == "its test time\n"
132
133 def test_three_files(self, tmp_path):
134 """Copy one file into the dest dir."""
135 file1 = tmp_path / "f1.txt"
136 file2 = tmp_path / "f2.txt"
137 file3 = tmp_path / "f3.txt"
138 dst = tmp_path / "dst"
139 dst.mkdir(parents=True)
140
141 with file1.open("w") as fh:
142 fh.write("its test time\n")
143 with file2.open("w") as fh:
144 fh.write("f2\n")
145 with file3.open("w") as fh:
146 fh.write("f3\n")
147
148 self._copy_files([file1, file2, file3], dst)
149 assert (dst / "f1.txt").exists()
150 assert (dst / "f2.txt").exists()
151 assert (dst / "f3.txt").exists()
152
153 def test_three_files_nodest(self, tmp_path):
154 """Copy files to dest path (not a dir). This is not a useful operation."""
155 file1 = tmp_path / "f1.txt"
156 file2 = tmp_path / "f2.txt"
157 file3 = tmp_path / "f3.txt"
158 dst = tmp_path / "dst"
159
160 with file1.open("w") as fh:
161 fh.write("its test time\n")
162 with file2.open("w") as fh:
163 fh.write("f2\n")
164 with file3.open("w") as fh:
165 fh.write("f3\n")
166
167 self._copy_files([file1, file2, file3], dst)
168 assert not dst.is_dir()
169 assert dst.is_file()
170 assert dst.open("r").read() == "f3\n"
171
172 def test_one_file_set_uid(self, tmp_path):
173 """Explicity pass uid/gid values and assert these are passed to chown."""
174 # Because this test will often be run by non-root users it is necessary
175 # to mock os.chown or we too easily run into perms issues.
176 file1 = tmp_path / "f1.txt"
177 dst = tmp_path / "dst"
178 dst.mkdir(parents=True)
179
180 with file1.open("w") as fh:
181 fh.write("its test time\n")
182
183 assert not (dst / "f1.txt").exists()
184
185 with mock.patch("os.chown") as _chown:
186 _chown.return_value = None
187 self._copy_files([file1], dst, uid=0, gid=0)
188 assert len(_chown.mock_calls) >= 1
189 for c in _chown.mock_calls:
190 assert c == mock.call(mock.ANY, 0, 0)
191 assert (dst / "f1.txt").exists()
192
193
194 class TestMoveFiles:
195 def _move_files(self, *args, **kwargs):
196 with with_cephadm_ctx([]) as ctx:
197 with mock.patch("cephadm.extract_uid_gid") as eug:
198 eug.return_value = (os.getuid(), os.getgid())
199 _cephadm.move_files(ctx, *args, **kwargs)
200
201 def test_one_file(self, tmp_path):
202 """Move a named file to test dest path."""
203 file1 = tmp_path / "f1.txt"
204 dst = tmp_path / "dst"
205
206 with file1.open("w") as fh:
207 fh.write("lets moove\n")
208
209 assert not dst.exists()
210 assert file1.is_file()
211
212 self._move_files([file1], dst)
213 assert dst.is_file()
214 assert not file1.exists()
215
216 def test_one_file_destdir(self, tmp_path):
217 """Move a file into an existing dest dir."""
218 file1 = tmp_path / "f1.txt"
219 dst = tmp_path / "dst"
220 dst.mkdir(parents=True)
221
222 with file1.open("w") as fh:
223 fh.write("lets moove\n")
224
225 assert not (dst / "f1.txt").exists()
226 assert file1.is_file()
227
228 self._move_files([file1], dst)
229 assert (dst / "f1.txt").is_file()
230 assert not file1.exists()
231
232 def test_one_file_one_link(self, tmp_path):
233 """Move a file and a symlink to that file to a dest dir."""
234 file1 = tmp_path / "f1.txt"
235 link1 = tmp_path / "lnk"
236 dst = tmp_path / "dst"
237 dst.mkdir(parents=True)
238
239 with file1.open("w") as fh:
240 fh.write("lets moove\n")
241 os.symlink("f1.txt", link1)
242
243 assert not (dst / "f1.txt").exists()
244 assert file1.is_file()
245 assert link1.exists()
246
247 self._move_files([file1, link1], dst)
248 assert (dst / "f1.txt").is_file()
249 assert (dst / "lnk").is_symlink()
250 assert not file1.exists()
251 assert not link1.exists()
252 assert (dst / "f1.txt").open("r").read() == "lets moove\n"
253 assert (dst / "lnk").open("r").read() == "lets moove\n"
254
255 def test_one_file_set_uid(self, tmp_path):
256 """Explicity pass uid/gid values and assert these are passed to chown."""
257 # Because this test will often be run by non-root users it is necessary
258 # to mock os.chown or we too easily run into perms issues.
259 file1 = tmp_path / "f1.txt"
260 dst = tmp_path / "dst"
261
262 with file1.open("w") as fh:
263 fh.write("lets moove\n")
264
265 assert not dst.exists()
266 assert file1.is_file()
267
268 with mock.patch("os.chown") as _chown:
269 _chown.return_value = None
270 self._move_files([file1], dst, uid=0, gid=0)
271 assert len(_chown.mock_calls) >= 1
272 for c in _chown.mock_calls:
273 assert c == mock.call(mock.ANY, 0, 0)
274 assert dst.is_file()
275 assert not file1.exists()
276
277
278 def test_recursive_chown(tmp_path):
279 d1 = tmp_path / "dir1"
280 d2 = d1 / "dir2"
281 f1 = d2 / "file1.txt"
282 d2.mkdir(parents=True)
283
284 with f1.open("w") as fh:
285 fh.write("low down\n")
286
287 with mock.patch("os.chown") as _chown:
288 _chown.return_value = None
289 _cephadm.recursive_chown(str(d1), uid=500, gid=500)
290 assert len(_chown.mock_calls) == 3
291 assert _chown.mock_calls[0] == mock.call(str(d1), 500, 500)
292 assert _chown.mock_calls[1] == mock.call(str(d2), 500, 500)
293 assert _chown.mock_calls[2] == mock.call(str(f1), 500, 500)
294
295
296 class TestFindExecutable:
297 def test_standard_exe(self):
298 # pretty much every system will have `true` on the path. It's a safe choice
299 # for the first assertion
300 exe = _cephadm.find_executable("true")
301 assert exe.endswith("true")
302
303 def test_custom_path(self, tmp_path):
304 foo_sh = tmp_path / "foo.sh"
305 with open(foo_sh, "w") as fh:
306 fh.write("#!/bin/sh\n")
307 fh.write("echo foo\n")
308 foo_sh.chmod(0o755)
309
310 exe = _cephadm.find_executable(foo_sh)
311 assert str(exe) == str(foo_sh)
312
313 def test_no_path(self, monkeypatch):
314 monkeypatch.delenv("PATH")
315 exe = _cephadm.find_executable("true")
316 assert exe.endswith("true")
317
318 def test_no_path_no_confstr(self, monkeypatch):
319 def _fail(_):
320 raise ValueError("fail")
321
322 monkeypatch.delenv("PATH")
323 monkeypatch.setattr("os.confstr", _fail)
324 exe = _cephadm.find_executable("true")
325 assert exe.endswith("true")
326
327 def test_unset_path(self):
328 exe = _cephadm.find_executable("true", path="")
329 assert exe is None
330
331 def test_no_such_exe(self):
332 exe = _cephadm.find_executable("foo_bar-baz.noway")
333 assert exe is None
334
335
336 def test_find_program():
337 exe = _cephadm.find_program("true")
338 assert exe.endswith("true")
339
340 with pytest.raises(ValueError):
341 _cephadm.find_program("foo_bar-baz.noway")
342
343
344 def _mk_fake_call(enabled, active):
345 def _fake_call(ctx, cmd, **kwargs):
346 if "is-enabled" in cmd:
347 if isinstance(enabled, Exception):
348 raise enabled
349 return enabled
350 if "is-active" in cmd:
351 if isinstance(active, Exception):
352 raise active
353 return active
354 raise ValueError("should not get here")
355
356 return _fake_call
357
358
359 @pytest.mark.parametrize(
360 "enabled_out, active_out, expected",
361 [
362 (
363 # ok, all is well
364 ("", "", 0),
365 ("active", "", 0),
366 (True, "running", True),
367 ),
368 (
369 # disabled, unknown if active
370 ("disabled", "", 1),
371 ("", "", 0),
372 (False, "unknown", True),
373 ),
374 (
375 # is-enabled error (not disabled, unknown if active
376 ("bleh", "", 1),
377 ("", "", 0),
378 (False, "unknown", False),
379 ),
380 (
381 # is-enabled ok, inactive is stopped
382 ("", "", 0),
383 ("inactive", "", 0),
384 (True, "stopped", True),
385 ),
386 (
387 # is-enabled ok, failed is error
388 ("", "", 0),
389 ("failed", "", 0),
390 (True, "error", True),
391 ),
392 (
393 # is-enabled ok, auto-restart is error
394 ("", "", 0),
395 ("auto-restart", "", 0),
396 (True, "error", True),
397 ),
398 (
399 # error exec'ing is-enabled cmd
400 ValueError("bonk"),
401 ("active", "", 0),
402 (False, "running", False),
403 ),
404 (
405 # error exec'ing is-enabled cmd
406 ("", "", 0),
407 ValueError("blat"),
408 (True, "unknown", True),
409 ),
410 ],
411 )
412 def test_check_unit(enabled_out, active_out, expected):
413 with with_cephadm_ctx([]) as ctx:
414 _cephadm.call.side_effect = _mk_fake_call(
415 enabled=enabled_out,
416 active=active_out,
417 )
418 enabled, state, installed = _cephadm.check_unit(ctx, "foobar")
419 assert (enabled, state, installed) == expected
420
421
422 class FakeEnabler:
423 def __init__(self, should_be_called):
424 self._should_be_called = should_be_called
425 self._services = []
426
427 def enable_service(self, service):
428 self._services.append(service)
429
430 def check_expected(self):
431 if not self._should_be_called:
432 assert not self._services
433 return
434 # there are currently seven chron/chrony type services that
435 # cephadm looks for. Make sure it probed for each of them
436 # or more in case someone adds to the list.
437 assert len(self._services) >= 7
438 assert "chrony.service" in self._services
439 assert "ntp.service" in self._services
440
441
442 @pytest.mark.parametrize(
443 "call_fn, enabler, expected",
444 [
445 # Test that time sync services are not enabled
446 (
447 _mk_fake_call(
448 enabled=("", "", 1),
449 active=("", "", 1),
450 ),
451 None,
452 False,
453 ),
454 # Test that time sync service is enabled
455 (
456 _mk_fake_call(
457 enabled=("", "", 0),
458 active=("active", "", 0),
459 ),
460 None,
461 True,
462 ),
463 # Test that time sync is not enabled, and try to enable them.
464 # This one needs to be not running, but installed in order to
465 # call the enabler. It should call the enabler with every known
466 # service name.
467 (
468 _mk_fake_call(
469 enabled=("disabled", "", 1),
470 active=("", "", 1),
471 ),
472 FakeEnabler(True),
473 False,
474 ),
475 # Test that time sync is enabled, with an enabler passed which
476 # will check that the enabler was never called.
477 (
478 _mk_fake_call(
479 enabled=("", "", 0),
480 active=("active", "", 0),
481 ),
482 FakeEnabler(False),
483 True,
484 ),
485 ],
486 )
487 def test_check_time_sync(call_fn, enabler, expected):
488 """The check_time_sync call actually checks if a time synchronization service
489 is enabled. It is also the only consumer of check_units.
490 """
491 with with_cephadm_ctx([]) as ctx:
492 _cephadm.call.side_effect = call_fn
493 result = _cephadm.check_time_sync(ctx, enabler=enabler)
494 assert result == expected
495 if enabler is not None:
496 enabler.check_expected()
497
498
499 @pytest.mark.parametrize(
500 "content, expected",
501 [
502 (
503 """#JUNK
504 FOO=1
505 """,
506 (None, None, None),
507 ),
508 (
509 """# A sample from a real centos system
510 NAME="CentOS Stream"
511 VERSION="8"
512 ID="centos"
513 ID_LIKE="rhel fedora"
514 VERSION_ID="8"
515 PLATFORM_ID="platform:el8"
516 PRETTY_NAME="CentOS Stream 8"
517 ANSI_COLOR="0;31"
518 CPE_NAME="cpe:/o:centos:centos:8"
519 HOME_URL="https://centos.org/"
520 BUG_REPORT_URL="https://bugzilla.redhat.com/"
521 REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8"
522 REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream"
523 """,
524 ("centos", "8", None),
525 ),
526 (
527 """# Minimal but complete, made up vals
528 ID="hpec"
529 VERSION_ID="33"
530 VERSION_CODENAME="hpec nimda"
531 """,
532 ("hpec", "33", "hpec nimda"),
533 ),
534 (
535 """# Minimal but complete, no quotes
536 ID=hpec
537 VERSION_ID=33
538 VERSION_CODENAME=hpec nimda
539 """,
540 ("hpec", "33", "hpec nimda"),
541 ),
542 ],
543 )
544 def test_get_distro(monkeypatch, content, expected):
545 def _fake_open(*args, **kwargs):
546 return io.StringIO(content)
547
548 monkeypatch.setattr("builtins.open", _fake_open)
549 assert _cephadm.get_distro() == expected
550
551
552 class FakeContext:
553 """FakeContext is a minimal type for passing as a ctx, when
554 with_cephadm_ctx is not appropriate (it enables too many mocks, etc).
555 """
556
557 timeout = 30
558
559
560 def _has_non_zero_exit(clog):
561 assert any("Non-zero exit" in ll for _, _, ll in clog.record_tuples)
562
563
564 def _has_values_somewhere(clog, values, non_zero=True):
565 if non_zero:
566 _has_non_zero_exit(clog)
567 for value in values:
568 assert any(value in ll for _, _, ll in clog.record_tuples)
569
570
571 @pytest.mark.parametrize(
572 "pyline, expected, call_kwargs, log_check",
573 [
574 pytest.param(
575 "import time; time.sleep(0.1)",
576 ("", "", 0),
577 {},
578 None,
579 id="brief-sleep",
580 ),
581 pytest.param(
582 "import sys; sys.exit(2)",
583 ("", "", 2),
584 {},
585 _has_non_zero_exit,
586 id="exit-non-zero",
587 ),
588 pytest.param(
589 "import sys; sys.exit(0)",
590 ("", "", 0),
591 {"desc": "success"},
592 None,
593 id="success-with-desc",
594 ),
595 pytest.param(
596 "print('foo'); print('bar')",
597 ("foo\nbar\n", "", 0),
598 {"desc": "stdout"},
599 None,
600 id="stdout-print",
601 ),
602 pytest.param(
603 "import sys; sys.stderr.write('la\\nla\\nla\\n')",
604 ("", "la\nla\nla\n", 0),
605 {"desc": "stderr"},
606 None,
607 id="stderr-print",
608 ),
609 pytest.param(
610 "for i in range(501): print(i, flush=True)",
611 lambda r: r[2] == 0 and r[1] == "" and "500" in r[0].splitlines(),
612 {},
613 None,
614 id="stdout-long",
615 ),
616 pytest.param(
617 "for i in range(1000000): print(i, flush=True)",
618 lambda r: r[2] == 0
619 and r[1] == ""
620 and len(r[0].splitlines()) == 1000000,
621 {},
622 None,
623 id="stdout-very-long",
624 ),
625 pytest.param(
626 "import sys; sys.stderr.write('pow\\noof\\nouch\\n'); sys.exit(1)",
627 ("", "pow\noof\nouch\n", 1),
628 {"desc": "stderr"},
629 functools.partial(
630 _has_values_somewhere,
631 values=["pow", "oof", "ouch"],
632 non_zero=True,
633 ),
634 id="stderr-logged-non-zero",
635 ),
636 pytest.param(
637 "import time; time.sleep(4)",
638 ("", "", 124),
639 {"timeout": 1},
640 None,
641 id="long-sleep",
642 ),
643 pytest.param(
644 "import time\nfor i in range(100):\n\tprint(i, flush=True); time.sleep(0.01)",
645 ("", "", 124),
646 {"timeout": 0.5},
647 None,
648 id="slow-print-timeout",
649 ),
650 # Commands that time out collect no logs, return empty std{out,err} strings
651 ],
652 )
653 def test_call(caplog, monkeypatch, pyline, expected, call_kwargs, log_check):
654 import logging
655
656 caplog.set_level(logging.INFO)
657 monkeypatch.setattr("cephadm.logger", logging.getLogger())
658 ctx = FakeContext()
659 result = _cephadm.call(ctx, [sys.executable, "-c", pyline], **call_kwargs)
660 if callable(expected):
661 assert expected(result)
662 else:
663 assert result == expected
664 if callable(log_check):
665 log_check(caplog)
666
667
668 class TestWriteNew:
669 def test_success(self, tmp_path):
670 "Test the simple basic feature of writing a file."
671 dest = tmp_path / "foo.txt"
672 with _cephadm.write_new(dest) as fh:
673 fh.write("something\n")
674 fh.write("something else\n")
675
676 with open(dest, "r") as fh:
677 assert fh.read() == "something\nsomething else\n"
678
679 def test_write_ower_mode(self, tmp_path):
680 "Test that the owner and perms options function."
681 dest = tmp_path / "foo.txt"
682
683 # if this is test run as non-root, we can't really change ownership
684 uid = os.getuid()
685 gid = os.getgid()
686
687 with _cephadm.write_new(dest, owner=(uid, gid), perms=0o600) as fh:
688 fh.write("xomething\n")
689 fh.write("xomething else\n")
690
691 with open(dest, "r") as fh:
692 assert fh.read() == "xomething\nxomething else\n"
693 sr = os.fstat(fh.fileno())
694 assert sr.st_uid == uid
695 assert sr.st_gid == gid
696 assert (sr.st_mode & 0o777) == 0o600
697
698 def test_encoding(self, tmp_path):
699 "Test that the encoding option functions."
700 dest = tmp_path / "foo.txt"
701 msg = "\u2603\u26C5\n"
702 with _cephadm.write_new(dest, encoding='utf-8') as fh:
703 fh.write(msg)
704 with open(dest, "rb") as fh:
705 b1 = fh.read()
706 assert b1.decode('utf-8') == msg
707
708 dest = tmp_path / "foo2.txt"
709 with _cephadm.write_new(dest, encoding='utf-16le') as fh:
710 fh.write(msg)
711 with open(dest, "rb") as fh:
712 b2 = fh.read()
713 assert b2.decode('utf-16le') == msg
714
715 # the binary data should differ due to the different encodings
716 assert b1 != b2
717
718 def test_cleanup(self, tmp_path):
719 "Test that an exception during write leaves no file behind."
720 dest = tmp_path / "foo.txt"
721 with pytest.raises(ValueError):
722 with _cephadm.write_new(dest) as fh:
723 fh.write("hello\n")
724 raise ValueError("foo")
725 fh.write("world\n")
726 assert not dest.exists()
727 assert not dest.with_name(dest.name+".new").exists()
728 assert list(dest.parent.iterdir()) == []
729
730
731 class CompareContext1:
732 cfg_data = {
733 "name": "mane",
734 "fsid": "foobar",
735 "image": "fake.io/noway/nohow:gndn",
736 "meta": {
737 "fruit": "banana",
738 "vegetable": "carrot",
739 },
740 "params": {
741 "osd_fsid": "robble",
742 "tcp_ports": [404, 9999],
743 },
744 "config_blobs": {
745 "alpha": {"sloop": "John B"},
746 "beta": {"forest": "birch"},
747 "gamma": {"forest": "pine"},
748 },
749 }
750
751 def check(self, ctx):
752 assert ctx.name == 'mane'
753 assert ctx.fsid == 'foobar'
754 assert ctx.image == 'fake.io/noway/nohow:gndn'
755 assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"}
756 assert ctx.config_blobs == {
757 "alpha": {"sloop": "John B"},
758 "beta": {"forest": "birch"},
759 "gamma": {"forest": "pine"},
760 }
761 assert ctx.osd_fsid == "robble"
762 assert ctx.tcp_ports == [404, 9999]
763
764
765 class CompareContext2:
766 cfg_data = {
767 "name": "cc2",
768 "fsid": "foobar",
769 "meta": {
770 "fruit": "banana",
771 "vegetable": "carrot",
772 },
773 "params": {},
774 "config_blobs": {
775 "alpha": {"sloop": "John B"},
776 "beta": {"forest": "birch"},
777 "gamma": {"forest": "pine"},
778 },
779 }
780
781 def check(self, ctx):
782 assert ctx.name == 'cc2'
783 assert ctx.fsid == 'foobar'
784 assert ctx.image == 'quay.io/ceph/ceph:v18'
785 assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"}
786 assert ctx.config_blobs == {
787 "alpha": {"sloop": "John B"},
788 "beta": {"forest": "birch"},
789 "gamma": {"forest": "pine"},
790 }
791 assert ctx.osd_fsid is None
792 assert ctx.tcp_ports is None
793
794
795 @pytest.mark.parametrize(
796 "cc",
797 [
798 CompareContext1(),
799 CompareContext2(),
800 ],
801 )
802 def test_apply_deploy_config_to_ctx(cc, monkeypatch):
803 import logging
804
805 monkeypatch.setattr("cephadm.logger", logging.getLogger())
806 ctx = FakeContext()
807 _cephadm.apply_deploy_config_to_ctx(cc.cfg_data, ctx)
808 cc.check(ctx)