]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/progress/test_progress.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / progress / test_progress.py
1 #python unit test
2 import unittest
3 import os
4 import sys
5 from tests import mock
6
7 import pytest
8 import json
9 os.environ['UNITTEST'] = "1"
10 sys.path.insert(0, "../../pybind/mgr")
11 from progress import module
12
13 class TestPgRecoveryEvent(object):
14 # Testing PgRecoveryEvent class
15
16 def setup(self):
17 # Creating the class and Mocking
18 # a bunch of attributes for testing
19 module._module = mock.Mock() # just so Event._refresh() works
20 self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30, False)
21
22 def test_pg_update(self):
23 # Test for a completed event when the pg states show active+clear
24 pg_stats = {
25 "pg_stats":[
26 {
27 "state": "active+clean",
28 "stat_sum": {
29 "num_bytes": 10,
30 "num_bytes_recovered": 10
31 },
32 "up": [
33 3,
34 1
35 ],
36 "acting": [
37 3,
38 1
39 ],
40 "pgid": "1.0",
41 "reported_epoch": 30
42 },
43 {
44 "state": "active+clean",
45 "stat_sum": {
46 "num_bytes": 10,
47 "num_bytes_recovered": 10
48 },
49 "up": [
50 3,
51 1
52 ],
53 "acting": [
54 3,
55 1
56 ],
57 "pgid": "1.1",
58 "reported_epoch": 30
59 },
60 {
61 "state": "active+clean",
62 "stat_sum": {
63 "num_bytes": 10,
64 "num_bytes_recovered": 10
65 },
66 "up": [
67 3,
68 1
69 ],
70 "acting": [
71 3,
72 1
73 ],
74 "pgid": "1.2",
75 "reported_epoch": 30
76 }
77 ]
78 }
79
80 self.test_event.pg_update(pg_stats, True, mock.Mock())
81 assert self.test_event._progress == 1.0
82
83 class OSDMap:
84
85 # This is an artificial class to help
86 # _osd_in_out function have all the
87 # necessary characteristics, some
88 # of the funcitons are copied from
89 # mgr_module
90
91 def __init__(self, dump, pg_stats):
92 self._dump = dump
93 self._pg_stats = pg_stats
94
95 def _pg_to_up_acting_osds(self, pool_id, ps):
96 pg_id = str(pool_id) + "." + str(ps)
97 for pg in self._pg_stats["pg_stats"]:
98 if pg["pg_id"] == pg_id:
99 ret = {
100 "up_primary": pg["up_primary"],
101 "acting_primary": pg["acting_primary"],
102 "up": pg["up"],
103 "acting": pg["acting"]
104 }
105 return ret
106
107 def dump(self):
108 return self._dump
109
110 def get_pools(self):
111 d = self._dump()
112 return dict([(p['pool'], p) for p in d['pools']])
113
114 def get_pools_by_name(self):
115 d = self._dump()
116 return dict([(p['pool_name'], p) for p in d['pools']])
117
118 def pg_to_up_acting_osds(self, pool_id, ps):
119 return self._pg_to_up_acting_osds(pool_id, ps)
120
121 class TestModule(object):
122 # Testing Module Class
123
124 def setup(self):
125 # Creating the class and Mocking a
126 # bunch of attributes for testing
127
128 module.PgRecoveryEvent.pg_update = mock.Mock()
129 module.Module._ceph_get_option = mock.Mock() # .__init__
130 module.Module._configure_logging = lambda *args: ... # .__init__
131 self.test_module = module.Module('module_name', 0, 0) # so we can see if an event gets created
132 self.test_module.get = mock.Mock() # so we can call pg_update
133 self.test_module._complete = mock.Mock() # we want just to see if this event gets called
134 self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works
135 module._module = mock.Mock() # so that Event.refresh() works
136
137 def test_osd_in_out(self):
138 # test for the correct event being
139 # triggered and completed.
140
141 old_pg_stats = {
142 "pg_stats":[
143 {
144 "pg_id": "1.0",
145 "up_primary": 3,
146 "acting_primary": 3,
147 "up": [
148 3,
149 0
150 ],
151 "acting": [
152 3,
153 0
154 ]
155
156 },
157
158 ]
159 }
160 new_pg_stats = {
161 "pg_stats":[
162 {
163 "pg_id": "1.0",
164 "up_primary": 0,
165 "acting_primary": 0,
166 "up": [
167 0,
168 2
169 ],
170 "acting": [
171 0,
172 2
173 ]
174 },
175 ]
176 }
177
178 old_dump ={
179 "pools": [
180 {
181 "pool": 1,
182 "pg_num": 1
183 }
184 ]
185 }
186
187 new_dump = {
188 "pools": [
189 {
190 "pool": 1,
191 "pg_num": 1
192 }
193 ]
194 }
195
196 new_map = OSDMap(new_dump, new_pg_stats)
197 old_map = OSDMap(old_dump, old_pg_stats)
198 self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out")
199 # check if only one event is created
200 assert len(self.test_module._events) == 1
201 self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in")
202 # check if complete function is called
203 assert self.test_module._complete.call_count == 1
204 # check if a PgRecovery Event was created and pg_update gets triggered
205 assert module.PgRecoveryEvent.pg_update.call_count == 2