]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | from cStringIO import StringIO |
2 | import logging | |
3 | import json | |
4 | import requests | |
5 | from requests.packages.urllib3.util import Retry | |
6 | from urlparse import urlparse | |
7 | ||
8 | from teuthology.orchestra.connection import split_user | |
9 | from teuthology import misc as teuthology | |
10 | ||
11 | log = logging.getLogger(__name__) | |
12 | ||
13 | def multi_region_enabled(ctx): | |
14 | # this is populated by the radosgw-agent task, seems reasonable to | |
15 | # use that as an indicator that we're testing multi-region sync | |
16 | return 'radosgw_agent' in ctx | |
17 | ||
18 | def rgwadmin(ctx, client, cmd, stdin=StringIO(), check_status=False, | |
19 | format='json'): | |
20 | log.info('rgwadmin: {client} : {cmd}'.format(client=client,cmd=cmd)) | |
21 | testdir = teuthology.get_testdir(ctx) | |
22 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
23 | client_with_id = daemon_type + '.' + client_id | |
24 | pre = [ | |
25 | 'adjust-ulimits', | |
26 | 'ceph-coverage'.format(tdir=testdir), | |
27 | '{tdir}/archive/coverage'.format(tdir=testdir), | |
28 | 'radosgw-admin'.format(tdir=testdir), | |
29 | '--log-to-stderr', | |
30 | '--format', format, | |
31 | '-n', client_with_id, | |
32 | '--cluster', cluster_name, | |
33 | ] | |
34 | pre.extend(cmd) | |
35 | log.info('rgwadmin: cmd=%s' % pre) | |
36 | (remote,) = ctx.cluster.only(client).remotes.iterkeys() | |
37 | proc = remote.run( | |
38 | args=pre, | |
39 | check_status=check_status, | |
40 | stdout=StringIO(), | |
41 | stderr=StringIO(), | |
42 | stdin=stdin, | |
43 | ) | |
44 | r = proc.exitstatus | |
45 | out = proc.stdout.getvalue() | |
46 | j = None | |
47 | if not r and out != '': | |
48 | try: | |
49 | j = json.loads(out) | |
50 | log.info(' json result: %s' % j) | |
51 | except ValueError: | |
52 | j = out | |
53 | log.info(' raw result: %s' % j) | |
54 | return (r, j) | |
55 | ||
56 | def get_user_summary(out, user): | |
57 | """Extract the summary for a given user""" | |
58 | user_summary = None | |
59 | for summary in out['summary']: | |
60 | if summary.get('user') == user: | |
61 | user_summary = summary | |
62 | ||
63 | if not user_summary: | |
64 | raise AssertionError('No summary info found for user: %s' % user) | |
65 | ||
66 | return user_summary | |
67 | ||
68 | def get_user_successful_ops(out, user): | |
69 | summary = out['summary'] | |
70 | if len(summary) == 0: | |
71 | return 0 | |
72 | return get_user_summary(out, user)['total']['successful_ops'] | |
73 | ||
74 | def get_zone_host_and_port(ctx, client, zone): | |
75 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
76 | client_with_id = daemon_type + '.' + client_id | |
77 | _, period = rgwadmin(ctx, client, check_status=True, | |
78 | cmd=['period', 'get']) | |
79 | period_map = period['period_map'] | |
80 | zonegroups = period_map['zonegroups'] | |
81 | for zonegroup in zonegroups: | |
82 | for zone_info in zonegroup['zones']: | |
83 | if zone_info['name'] == zone: | |
84 | endpoint = urlparse(zone_info['endpoints'][0]) | |
85 | host, port = endpoint.hostname, endpoint.port | |
86 | if port is None: | |
87 | port = 80 | |
88 | return host, port | |
89 | assert False, 'no endpoint for zone {zone} found'.format(zone=zone) | |
90 | ||
91 | def get_master_zone(ctx, client): | |
92 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
93 | client_with_id = daemon_type + '.' + client_id | |
94 | _, period = rgwadmin(ctx, client, check_status=True, | |
95 | cmd=['period', 'get']) | |
96 | period_map = period['period_map'] | |
97 | zonegroups = period_map['zonegroups'] | |
98 | for zonegroup in zonegroups: | |
99 | is_master = (zonegroup['is_master'] == "true") | |
100 | log.info('zonegroup={z} is_master={ism}'.format(z=zonegroup, ism=is_master)) | |
101 | if not is_master: | |
102 | continue | |
103 | master_zone = zonegroup['master_zone'] | |
104 | log.info('master_zone=%s' % master_zone) | |
105 | for zone_info in zonegroup['zones']: | |
106 | if zone_info['name'] == master_zone: | |
107 | return master_zone | |
108 | log.info('couldn\'t find master zone') | |
109 | return None | |
110 | ||
111 | def get_master_client(ctx, clients): | |
112 | master_zone = get_master_zone(ctx, clients[0]) # can use any client for this as long as system configured correctly | |
113 | if not master_zone: | |
114 | return None | |
115 | ||
116 | for client in clients: | |
117 | zone = zone_for_client(ctx, client) | |
118 | if zone == master_zone: | |
119 | return client | |
120 | ||
121 | return None | |
122 | ||
123 | def get_zone_system_keys(ctx, client, zone): | |
124 | _, zone_info = rgwadmin(ctx, client, check_status=True, | |
125 | cmd=['zone', 'get', '--rgw-zone', zone]) | |
126 | system_key = zone_info['system_key'] | |
127 | return system_key['access_key'], system_key['secret_key'] | |
128 | ||
129 | def zone_for_client(ctx, client): | |
130 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
131 | ceph_config = ctx.ceph[cluster_name].conf.get('global', {}) | |
132 | ceph_config.update(ctx.ceph[cluster_name].conf.get('client', {})) | |
133 | ceph_config.update(ctx.ceph[cluster_name].conf.get(client, {})) | |
134 | return ceph_config.get('rgw zone') | |
135 | ||
136 | def region_for_client(ctx, client): | |
137 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
138 | ceph_config = ctx.ceph[cluster_name].conf.get('global', {}) | |
139 | ceph_config.update(ctx.ceph[cluster_name].conf.get('client', {})) | |
140 | ceph_config.update(ctx.ceph[cluster_name].conf.get(client, {})) | |
141 | return ceph_config.get('rgw region') | |
142 | ||
143 | def radosgw_data_log_window(ctx, client): | |
144 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
145 | ceph_config = ctx.ceph[cluster_name].conf.get('global', {}) | |
146 | ceph_config.update(ctx.ceph[cluster_name].conf.get('client', {})) | |
147 | ceph_config.update(ctx.ceph[cluster_name].conf.get(client, {})) | |
148 | return ceph_config.get('rgw data log window', 30) | |
149 | ||
150 | def radosgw_agent_sync_data(ctx, agent_host, agent_port, full=False): | |
151 | log.info('sync agent {h}:{p}'.format(h=agent_host, p=agent_port)) | |
152 | # use retry with backoff to tolerate slow startup of radosgw-agent | |
153 | s = requests.Session() | |
154 | s.mount('http://{addr}:{port}/'.format(addr = agent_host, port = agent_port), | |
155 | requests.adapters.HTTPAdapter(max_retries=Retry(total=5, backoff_factor=1))) | |
156 | method = "full" if full else "incremental" | |
157 | return s.post('http://{addr}:{port}/data/{method}'.format(addr = agent_host, port = agent_port, method = method)) | |
158 | ||
159 | def radosgw_agent_sync_metadata(ctx, agent_host, agent_port, full=False): | |
160 | log.info('sync agent {h}:{p}'.format(h=agent_host, p=agent_port)) | |
161 | # use retry with backoff to tolerate slow startup of radosgw-agent | |
162 | s = requests.Session() | |
163 | s.mount('http://{addr}:{port}/'.format(addr = agent_host, port = agent_port), | |
164 | requests.adapters.HTTPAdapter(max_retries=Retry(total=5, backoff_factor=1))) | |
165 | method = "full" if full else "incremental" | |
166 | return s.post('http://{addr}:{port}/metadata/{method}'.format(addr = agent_host, port = agent_port, method = method)) | |
167 | ||
168 | def radosgw_agent_sync_all(ctx, full=False, data=False): | |
169 | if ctx.radosgw_agent.procs: | |
170 | for agent_client, c_config in ctx.radosgw_agent.config.iteritems(): | |
171 | zone_for_client(ctx, agent_client) | |
172 | sync_host, sync_port = get_sync_agent(ctx, agent_client) | |
173 | log.debug('doing a sync via {host1}'.format(host1=sync_host)) | |
174 | radosgw_agent_sync_metadata(ctx, sync_host, sync_port, full) | |
175 | if (data): | |
176 | radosgw_agent_sync_data(ctx, sync_host, sync_port, full) | |
177 | ||
178 | def host_for_role(ctx, role): | |
179 | for target, roles in zip(ctx.config['targets'].iterkeys(), ctx.config['roles']): | |
180 | if role in roles: | |
181 | _, host = split_user(target) | |
182 | return host | |
183 | ||
184 | def get_sync_agent(ctx, source): | |
185 | for task in ctx.config['tasks']: | |
186 | if 'radosgw-agent' not in task: | |
187 | continue | |
188 | for client, conf in task['radosgw-agent'].iteritems(): | |
189 | if conf['src'] == source: | |
190 | return host_for_role(ctx, source), conf.get('port', 8000) | |
191 | return None, None | |
192 | ||
193 | def extract_zone_info(ctx, client, client_config): | |
194 | """ | |
195 | Get zone information. | |
196 | :param client: dictionary of client information | |
197 | :param client_config: dictionary of client configuration information | |
198 | :returns: zone extracted from client and client_config information | |
199 | """ | |
200 | cluster_name, daemon_type, client_id = teuthology.split_role(client) | |
201 | client_with_id = daemon_type + '.' + client_id | |
202 | ceph_config = ctx.ceph[cluster_name].conf.get('global', {}) | |
203 | ceph_config.update(ctx.ceph[cluster_name].conf.get('client', {})) | |
204 | ceph_config.update(ctx.ceph[cluster_name].conf.get(client_with_id, {})) | |
205 | for key in ['rgw zone', 'rgw region', 'rgw zone root pool']: | |
206 | assert key in ceph_config, \ | |
207 | 'ceph conf must contain {key} for {client}'.format(key=key, | |
208 | client=client) | |
209 | region = ceph_config['rgw region'] | |
210 | zone = ceph_config['rgw zone'] | |
211 | zone_info = dict() | |
212 | for key in ['rgw control pool', 'rgw gc pool', 'rgw log pool', | |
213 | 'rgw intent log pool', 'rgw usage log pool', | |
214 | 'rgw user keys pool', 'rgw user email pool', | |
215 | 'rgw user swift pool', 'rgw user uid pool', | |
216 | 'rgw domain root']: | |
217 | new_key = key.split(' ', 1)[1] | |
218 | new_key = new_key.replace(' ', '_') | |
219 | ||
220 | if key in ceph_config: | |
221 | value = ceph_config[key] | |
222 | log.debug('{key} specified in ceph_config ({val})'.format( | |
223 | key=key, val=value)) | |
224 | zone_info[new_key] = value | |
225 | else: | |
226 | zone_info[new_key] = '.' + region + '.' + zone + '.' + new_key | |
227 | ||
228 | index_pool = '.' + region + '.' + zone + '.' + 'index_pool' | |
229 | data_pool = '.' + region + '.' + zone + '.' + 'data_pool' | |
230 | data_extra_pool = '.' + region + '.' + zone + '.' + 'data_extra_pool' | |
231 | compression_type = ceph_config.get('rgw compression type', '') | |
232 | ||
233 | zone_info['placement_pools'] = [{'key': 'default_placement', | |
234 | 'val': {'index_pool': index_pool, | |
235 | 'data_pool': data_pool, | |
236 | 'data_extra_pool': data_extra_pool, | |
237 | 'compression': compression_type} | |
238 | }] | |
239 | ||
240 | # these keys are meant for the zones argument in the region info. We | |
241 | # insert them into zone_info with a different format and then remove them | |
242 | # in the fill_in_endpoints() method | |
243 | for key in ['rgw log meta', 'rgw log data']: | |
244 | if key in ceph_config: | |
245 | zone_info[key] = ceph_config[key] | |
246 | ||
247 | # these keys are meant for the zones argument in the region info. We | |
248 | # insert them into zone_info with a different format and then remove them | |
249 | # in the fill_in_endpoints() method | |
250 | for key in ['rgw log meta', 'rgw log data']: | |
251 | if key in ceph_config: | |
252 | zone_info[key] = ceph_config[key] | |
253 | ||
254 | return region, zone, zone_info | |
255 | ||
256 | def extract_region_info(region, region_info): | |
257 | """ | |
258 | Extract region information from the region_info parameter, using get | |
259 | to set default values. | |
260 | ||
261 | :param region: name of the region | |
262 | :param region_info: region information (in dictionary form). | |
263 | :returns: dictionary of region information set from region_info, using | |
264 | default values for missing fields. | |
265 | """ | |
266 | assert isinstance(region_info['zones'], list) and region_info['zones'], \ | |
267 | 'zones must be a non-empty list' | |
268 | return dict( | |
269 | name=region, | |
270 | api_name=region_info.get('api name', region), | |
271 | is_master=region_info.get('is master', False), | |
272 | log_meta=region_info.get('log meta', False), | |
273 | log_data=region_info.get('log data', False), | |
274 | master_zone=region_info.get('master zone', region_info['zones'][0]), | |
275 | placement_targets=region_info.get('placement targets', | |
276 | [{'name': 'default_placement', | |
277 | 'tags': []}]), | |
278 | default_placement=region_info.get('default placement', | |
279 | 'default_placement'), | |
280 | ) | |
281 | ||
282 | def get_config_master_client(ctx, config, regions): | |
283 | ||
284 | role_zones = dict([(client, extract_zone_info(ctx, client, c_config)) | |
285 | for client, c_config in config.iteritems()]) | |
286 | log.debug('roles_zones = %r', role_zones) | |
287 | region_info = dict([ | |
288 | (region_name, extract_region_info(region_name, r_config)) | |
289 | for region_name, r_config in regions.iteritems()]) | |
290 | ||
291 | # read master zonegroup and master_zone | |
292 | for zonegroup, zg_info in region_info.iteritems(): | |
293 | if zg_info['is_master']: | |
294 | master_zonegroup = zonegroup | |
295 | master_zone = zg_info['master_zone'] | |
296 | break | |
297 | ||
298 | for client in config.iterkeys(): | |
299 | (zonegroup, zone, zone_info) = role_zones[client] | |
300 | if zonegroup == master_zonegroup and zone == master_zone: | |
301 | return client | |
302 | ||
303 | return None | |
304 |