]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/progress/test_progress.py
9 os
.environ
['UNITTEST'] = "1"
10 sys
.path
.insert(0, "../../pybind/mgr")
11 from progress
import module
13 class TestPgRecoveryEvent(object):
14 # Testing PgRecoveryEvent class
17 # Creating the class and Mocking
18 # a bunch of attributes for testing
19 module
._module
= mock
.Mock() # just so Event._refresh() works
20 self
.test_event
= module
.PgRecoveryEvent(None, None, [module
.PgId(1,i
) for i
in range(3)], [0], 30)
22 def test_pg_update(self
):
23 # Test for a completed event when the pg states show active+clear
27 "state": "active+clean",
30 "num_bytes_recovered": 10
41 "reported_epoch": "30"
44 "state": "active+clean",
47 "num_bytes_recovered": 10
58 "reported_epoch": "30"
61 "state": "active+clean",
64 "num_bytes_recovered": 10
75 "reported_epoch": "30"
80 self
.test_event
.pg_update(pg_stats
, True, mock
.Mock())
81 assert self
.test_event
._progress
== 1.0
85 # This is an artificial class to help
86 # _osd_in_out function have all the
87 # necessary characteristics, some
88 # of the funcitons are copied from
91 def __init__(self
, dump
, pg_stats
):
93 self
._pg
_stats
= pg_stats
95 def _pg_to_up_acting_osds(self
, pool_id
, ps
):
96 pg_id
= str(pool_id
) + "." + str(ps
)
97 for pg
in self
._pg
_stats
["pg_stats"]:
98 if pg
["pg_id"] == pg_id
:
100 "up_primary": pg
["up_primary"],
101 "acting_primary": pg
["acting_primary"],
103 "acting": pg
["acting"]
112 return dict([(p
['pool'], p
) for p
in d
['pools']])
114 def get_pools_by_name(self
):
116 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
118 def pg_to_up_acting_osds(self
, pool_id
, ps
):
119 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
121 class TestModule(object):
122 # Testing Module Class
125 # Creating the class and Mocking a
126 # bunch of attributes for testing
128 module
.PgRecoveryEvent
.pg_update
= mock
.Mock()
129 self
.test_module
= module
.Module() # so we can see if an event gets created
130 self
.test_module
.log
= mock
.Mock() # we don't need to log anything
131 self
.test_module
.get
= mock
.Mock() # so we can call pg_update
132 self
.test_module
._complete
= mock
.Mock() # we want just to see if this event gets called
133 self
.test_module
.get_osdmap
= mock
.Mock() # so that self.get_osdmap().get_epoch() works
134 module
._module
= mock
.Mock() # so that Event.refresh() works
136 def test_osd_in_out(self
):
137 # test for the correct event being
138 # triggered and completed.
195 new_map
= OSDMap(new_dump
, new_pg_stats
)
196 old_map
= OSDMap(old_dump
, old_pg_stats
)
197 self
.test_module
._osd
_in
_out
(old_map
, old_dump
, new_map
, 3, "out")
198 # check if only one event is created
199 assert len(self
.test_module
._events
) == 1
200 self
.test_module
._osd
_in
_out
(old_map
, old_dump
, new_map
, 3, "in")
201 # check if complete function is called
202 assert self
.test_module
._complete
.call_count
== 1
203 # check if a PgRecovery Event was created and pg_update gets triggered
204 assert module
.PgRecoveryEvent
.pg_update
.call_count
== 2