]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/progress/test_progress.py
ef4e7e166180571faf51bda6ff191e3e238cea94
[ceph.git] / ceph / src / pybind / mgr / progress / test_progress.py
1 #python unit test
2 import unittest
3 import os
4 import sys
5 from tests import mock
6
7 import pytest
8 import json
9 os.environ['UNITTEST'] = "1"
10 sys.path.insert(0, "../../pybind/mgr")
11 from progress import module
12
13 class TestPgRecoveryEvent(object):
14 # Testing PgRecoveryEvent class
15
16 def setup(self):
17 # Creating the class and Mocking
18 # a bunch of attributes for testing
19 module._module = mock.Mock() # just so Event._refresh() works
20 self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30, False)
21
22 def test_pg_update(self):
23 # Test for a completed event when the pg states show active+clean
24 pg_progress = {
25 "pgs": {
26 "1.0": {
27 "state": "active+clean",
28 "num_bytes": 10,
29 "num_bytes_recovered": 10,
30 "reported_epoch": 30,
31 },
32 "1.1": {
33 "state": "active+clean",
34 "num_bytes": 10,
35 "num_bytes_recovered": 10,
36 "reported_epoch": 30,
37 },
38 "1.2": {
39 "state": "active+clean",
40 "num_bytes": 10,
41 "num_bytes_recovered": 10,
42 "reported_epoch": 30,
43 },
44 },
45 "pg_ready": True,
46 }
47 self.test_event.pg_update(pg_progress, mock.Mock())
48 assert self.test_event._progress == 1.0
49
50
51 class OSDMap:
52
53 # This is an artificial class to help
54 # _osd_in_out function have all the
55 # necessary characteristics, some
56 # of the funcitons are copied from
57 # mgr_module
58
59 def __init__(self, dump, pg_stats):
60 self._dump = dump
61 self._pg_stats = pg_stats
62
63 def _pg_to_up_acting_osds(self, pool_id, ps):
64 pg_id = str(pool_id) + "." + str(ps)
65 for pg in self._pg_stats["pg_stats"]:
66 if pg["pg_id"] == pg_id:
67 ret = {
68 "up_primary": pg["up_primary"],
69 "acting_primary": pg["acting_primary"],
70 "up": pg["up"],
71 "acting": pg["acting"]
72 }
73 return ret
74
75 def dump(self):
76 return self._dump
77
78 def get_pools(self):
79 d = self._dump()
80 return dict([(p['pool'], p) for p in d['pools']])
81
82 def get_pools_by_name(self):
83 d = self._dump()
84 return dict([(p['pool_name'], p) for p in d['pools']])
85
86 def pg_to_up_acting_osds(self, pool_id, ps):
87 return self._pg_to_up_acting_osds(pool_id, ps)
88
89
90 class TestModule(object):
91 # Testing Module Class
92
93 def setup(self):
94 # Creating the class and Mocking a
95 # bunch of attributes for testing
96
97 module.PgRecoveryEvent.pg_update = mock.Mock()
98 module.Module._ceph_get_option = mock.Mock() # .__init__
99 module.Module._configure_logging = lambda *args: ... # .__init__
100 self.test_module = module.Module('module_name', 0, 0) # so we can see if an event gets created
101 self.test_module.get = mock.Mock() # so we can call pg_update
102 self.test_module._complete = mock.Mock() # we want just to see if this event gets called
103 self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works
104 module._module = mock.Mock() # so that Event.refresh() works
105
106 def test_osd_in_out(self):
107 # test for the correct event being
108 # triggered and completed.
109
110 old_pg_stats = {
111 "pg_stats":[
112 {
113 "pg_id": "1.0",
114 "up_primary": 3,
115 "acting_primary": 3,
116 "up": [
117 3,
118 0
119 ],
120 "acting": [
121 3,
122 0
123 ]
124
125 },
126
127 ]
128 }
129 new_pg_stats = {
130 "pg_stats":[
131 {
132 "pg_id": "1.0",
133 "up_primary": 0,
134 "acting_primary": 0,
135 "up": [
136 0,
137 2
138 ],
139 "acting": [
140 0,
141 2
142 ]
143 },
144 ]
145 }
146
147 old_dump ={
148 "pools": [
149 {
150 "pool": 1,
151 "pg_num": 1
152 }
153 ]
154 }
155
156 new_dump = {
157 "pools": [
158 {
159 "pool": 1,
160 "pg_num": 1
161 }
162 ]
163 }
164
165 new_map = OSDMap(new_dump, new_pg_stats)
166 old_map = OSDMap(old_dump, old_pg_stats)
167 self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out")
168 # check if only one event is created
169 assert len(self.test_module._events) == 1
170 self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in")
171 # check if complete function is called
172 assert self.test_module._complete.call_count == 1
173 # check if a PgRecovery Event was created and pg_update gets triggered
174 assert module.PgRecoveryEvent.pg_update.call_count == 2