]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/progress/test_progress.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / progress / test_progress.py
1 #python unit test
2 import unittest
3 import os
4 import sys
5 from tests import mock
6
7 import pytest
8 import json
9 os.environ['UNITTEST'] = "1"
10 sys.path.insert(0, "../../pybind/mgr")
11 from progress import module
12
13 class TestPgRecoveryEvent(object):
14 # Testing PgRecoveryEvent class
15
16 def setup(self):
17 # Creating the class and Mocking
18 # a bunch of attributes for testing
19 module._module = mock.Mock() # just so Event._refresh() works
20 self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30)
21
22 def test_pg_update(self):
23 # Test for a completed event when the pg states show active+clear
24 pg_stats = {
25 "pg_stats":[
26 {
27 "state": "active+clean",
28 "stat_sum": {
29 "num_bytes": 10,
30 "num_bytes_recovered": 10
31 },
32 "up": [
33 3,
34 1
35 ],
36 "acting": [
37 3,
38 1
39 ],
40 "pgid": "1.0",
41 "reported_epoch": "30"
42 },
43 {
44 "state": "active+clean",
45 "stat_sum": {
46 "num_bytes": 10,
47 "num_bytes_recovered": 10
48 },
49 "up": [
50 3,
51 1
52 ],
53 "acting": [
54 3,
55 1
56 ],
57 "pgid": "1.1",
58 "reported_epoch": "30"
59 },
60 {
61 "state": "active+clean",
62 "stat_sum": {
63 "num_bytes": 10,
64 "num_bytes_recovered": 10
65 },
66 "up": [
67 3,
68 1
69 ],
70 "acting": [
71 3,
72 1
73 ],
74 "pgid": "1.2",
75 "reported_epoch": "30"
76 }
77 ]
78 }
79
80 self.test_event.pg_update(pg_stats, True, mock.Mock())
81 assert self.test_event._progress == 1.0
82
83 class OSDMap:
84
85 # This is an artificial class to help
86 # _osd_in_out function have all the
87 # necessary characteristics, some
88 # of the funcitons are copied from
89 # mgr_module
90
91 def __init__(self, dump, pg_stats):
92 self._dump = dump
93 self._pg_stats = pg_stats
94
95 def _pg_to_up_acting_osds(self, pool_id, ps):
96 pg_id = str(pool_id) + "." + str(ps)
97 for pg in self._pg_stats["pg_stats"]:
98 if pg["pg_id"] == pg_id:
99 ret = {
100 "up_primary": pg["up_primary"],
101 "acting_primary": pg["acting_primary"],
102 "up": pg["up"],
103 "acting": pg["acting"]
104 }
105 return ret
106
107 def dump(self):
108 return self._dump
109
110 def get_pools(self):
111 d = self._dump()
112 return dict([(p['pool'], p) for p in d['pools']])
113
114 def get_pools_by_name(self):
115 d = self._dump()
116 return dict([(p['pool_name'], p) for p in d['pools']])
117
118 def pg_to_up_acting_osds(self, pool_id, ps):
119 return self._pg_to_up_acting_osds(pool_id, ps)
120
121 class TestModule(object):
122 # Testing Module Class
123
124 def setup(self):
125 # Creating the class and Mocking a
126 # bunch of attributes for testing
127
128 module.PgRecoveryEvent.pg_update = mock.Mock()
129 self.test_module = module.Module() # so we can see if an event gets created
130 self.test_module.log = mock.Mock() # we don't need to log anything
131 self.test_module.get = mock.Mock() # so we can call pg_update
132 self.test_module._complete = mock.Mock() # we want just to see if this event gets called
133 self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works
134 module._module = mock.Mock() # so that Event.refresh() works
135
136 def test_osd_in_out(self):
137 # test for the correct event being
138 # triggered and completed.
139
140 old_pg_stats = {
141 "pg_stats":[
142 {
143 "pg_id": "1.0",
144 "up_primary": 3,
145 "acting_primary": 3,
146 "up": [
147 3,
148 0
149 ],
150 "acting": [
151 3,
152 0
153 ]
154
155 },
156
157 ]
158 }
159 new_pg_stats = {
160 "pg_stats":[
161 {
162 "pg_id": "1.0",
163 "up_primary": 0,
164 "acting_primary": 0,
165 "up": [
166 0,
167 2
168 ],
169 "acting": [
170 0,
171 2
172 ]
173 },
174 ]
175 }
176
177 old_dump ={
178 "pools": [
179 {
180 "pool": 1,
181 "pg_num": 1
182 }
183 ]
184 }
185
186 new_dump = {
187 "pools": [
188 {
189 "pool": 1,
190 "pg_num": 1
191 }
192 ]
193 }
194
195 new_map = OSDMap(new_dump, new_pg_stats)
196 old_map = OSDMap(old_dump, old_pg_stats)
197 self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out")
198 # check if only one event is created
199 assert len(self.test_module._events) == 1
200 self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in")
201 # check if complete function is called
202 assert self.test_module._complete.call_count == 1
203 # check if a PgRecovery Event was created and pg_update gets triggered
204 assert module.PgRecoveryEvent.pg_update.call_count == 2