]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | """ | |
3 | watch_notify_same_primary task | |
4 | """ | |
9f95a23c | 5 | from io import BytesIO |
7c673cae FG |
6 | import contextlib |
7 | import logging | |
8 | ||
9f95a23c TL |
9 | import six |
10 | ||
7c673cae FG |
11 | from teuthology.orchestra import run |
12 | from teuthology.contextutil import safe_while | |
13 | ||
14 | log = logging.getLogger(__name__) | |
15 | ||
16 | ||
17 | @contextlib.contextmanager | |
18 | def task(ctx, config): | |
19 | """ | |
20 | Run watch_notify_same_primary | |
21 | ||
22 | The config should be as follows: | |
23 | ||
24 | watch_notify_same_primary: | |
25 | clients: [client list] | |
26 | ||
27 | The client list should contain 1 client | |
28 | ||
29 | The test requires 3 osds. | |
30 | ||
31 | example: | |
32 | ||
33 | tasks: | |
34 | - ceph: | |
35 | - watch_notify_same_primary: | |
36 | clients: [client.0] | |
37 | - interactive: | |
38 | """ | |
39 | log.info('Beginning watch_notify_same_primary...') | |
40 | assert isinstance(config, dict), \ | |
41 | "please list clients to run on" | |
42 | ||
43 | clients = config.get('clients', ['client.0']) | |
44 | assert len(clients) == 1 | |
45 | role = clients[0] | |
9f95a23c | 46 | assert isinstance(role, six.string_types) |
7c673cae FG |
47 | PREFIX = 'client.' |
48 | assert role.startswith(PREFIX) | |
9f95a23c | 49 | (remote,) = ctx.cluster.only(role).remotes.keys() |
7c673cae FG |
50 | manager = ctx.managers['ceph'] |
51 | manager.raw_cluster_cmd('osd', 'set', 'noout') | |
52 | ||
53 | pool = manager.create_pool_with_unique_name() | |
54 | def obj(n): return "foo-{num}".format(num=n) | |
55 | def start_watch(n): | |
56 | remote.run( | |
57 | args = [ | |
58 | "rados", | |
59 | "-p", pool, | |
60 | "put", | |
61 | obj(n), | |
62 | "/etc/resolv.conf"], | |
63 | logger=log.getChild('watch.{id}'.format(id=n))) | |
64 | proc = remote.run( | |
65 | args = [ | |
66 | "rados", | |
67 | "-p", pool, | |
68 | "watch", | |
69 | obj(n)], | |
70 | stdin=run.PIPE, | |
9f95a23c TL |
71 | stdout=BytesIO(), |
72 | stderr=BytesIO(), | |
7c673cae FG |
73 | wait=False) |
74 | return proc | |
75 | ||
76 | num = 20 | |
77 | ||
78 | watches = [start_watch(i) for i in range(num)] | |
79 | ||
80 | # wait for them all to register | |
81 | for i in range(num): | |
82 | with safe_while() as proceed: | |
83 | while proceed(): | |
9f95a23c TL |
84 | lines = remote.sh( |
85 | ["rados", "-p", pool, "listwatchers", obj(i)]) | |
7c673cae FG |
86 | num_watchers = lines.count('watcher=') |
87 | log.info('i see %d watchers for %s', num_watchers, obj(i)) | |
88 | if num_watchers >= 1: | |
89 | break | |
90 | ||
91 | def notify(n, msg): | |
92 | remote.run( | |
93 | args = [ | |
94 | "rados", | |
95 | "-p", pool, | |
96 | "notify", | |
97 | obj(n), | |
98 | msg], | |
99 | logger=log.getChild('notify.{id}'.format(id=n))) | |
100 | ||
101 | [notify(n, 'notify1') for n in range(len(watches))] | |
102 | ||
103 | manager.kill_osd(0) | |
104 | manager.mark_down_osd(0) | |
105 | ||
106 | [notify(n, 'notify2') for n in range(len(watches))] | |
107 | ||
108 | try: | |
109 | yield | |
110 | finally: | |
111 | log.info('joining watch_notify_stress') | |
112 | for watch in watches: | |
113 | watch.stdin.write("\n") | |
114 | ||
115 | run.wait(watches) | |
116 | ||
117 | for watch in watches: | |
118 | lines = watch.stdout.getvalue().split("\n") | |
119 | got1 = False | |
120 | got2 = False | |
121 | for l in lines: | |
122 | if 'notify1' in l: | |
123 | got1 = True | |
124 | if 'notify2' in l: | |
125 | got2 = True | |
126 | log.info(lines) | |
127 | assert got1 and got2 | |
128 | ||
129 | manager.revive_osd(0) | |
130 | manager.remove_pool(pool) |