]>
Commit | Line | Data |
---|---|---|
1 | import json | |
2 | import random | |
3 | import string | |
4 | import sys | |
5 | import time | |
6 | import logging | |
7 | ||
8 | try: | |
9 | from itertools import izip_longest as zip_longest | |
10 | except ImportError: | |
11 | from itertools import zip_longest | |
12 | from itertools import combinations | |
13 | ||
14 | import boto | |
15 | import boto.s3.connection | |
16 | from boto.s3.website import WebsiteConfiguration | |
17 | from boto.s3.cors import CORSConfiguration | |
18 | ||
19 | from nose.tools import eq_ as eq | |
20 | from nose.plugins.attrib import attr | |
21 | from nose.plugins.skip import SkipTest | |
22 | ||
23 | from .multisite import Zone | |
24 | ||
25 | from .conn import get_gateway_connection | |
26 | ||
27 | class Config: | |
28 | """ test configuration """ | |
29 | def __init__(self, **kwargs): | |
30 | # by default, wait up to 5 minutes before giving up on a sync checkpoint | |
31 | self.checkpoint_retries = kwargs.get('checkpoint_retries', 60) | |
32 | self.checkpoint_delay = kwargs.get('checkpoint_delay', 5) | |
33 | # allow some time for realm reconfiguration after changing master zone | |
34 | self.reconfigure_delay = kwargs.get('reconfigure_delay', 5) | |
35 | ||
36 | # rgw multisite tests, written against the interfaces provided in rgw_multi. | |
37 | # these tests must be initialized and run by another module that provides | |
38 | # implementations of these interfaces by calling init_multi() | |
39 | realm = None | |
40 | user = None | |
41 | config = None | |
42 | def init_multi(_realm, _user, _config=None): | |
43 | global realm | |
44 | realm = _realm | |
45 | global user | |
46 | user = _user | |
47 | global config | |
48 | config = _config or Config() | |
49 | realm_meta_checkpoint(realm) | |
50 | ||
51 | def get_realm(): | |
52 | return realm | |
53 | ||
54 | log = logging.getLogger(__name__) | |
55 | ||
56 | num_buckets = 0 | |
57 | run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) | |
58 | ||
59 | def get_gateway_connection(gateway, credentials): | |
60 | """ connect to the given gateway """ | |
61 | if gateway.connection is None: | |
62 | gateway.connection = boto.connect_s3( | |
63 | aws_access_key_id = credentials.access_key, | |
64 | aws_secret_access_key = credentials.secret, | |
65 | host = gateway.host, | |
66 | port = gateway.port, | |
67 | is_secure = False, | |
68 | calling_format = boto.s3.connection.OrdinaryCallingFormat()) | |
69 | return gateway.connection | |
70 | ||
71 | def get_zone_connection(zone, credentials): | |
72 | """ connect to the zone's first gateway """ | |
73 | if isinstance(credentials, list): | |
74 | credentials = credentials[0] | |
75 | return get_gateway_connection(zone.gateways[0], credentials) | |
76 | ||
77 | def mdlog_list(zone, period = None): | |
78 | cmd = ['mdlog', 'list'] | |
79 | if period: | |
80 | cmd += ['--period', period] | |
81 | (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True) | |
82 | mdlog_json = mdlog_json.decode('utf-8') | |
83 | return json.loads(mdlog_json) | |
84 | ||
85 | def meta_sync_status(zone): | |
86 | while True: | |
87 | cmd = ['metadata', 'sync', 'status'] + zone.zone_args() | |
88 | meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True) | |
89 | if retcode == 0: | |
90 | break | |
91 | assert(retcode == 2) # ENOENT | |
92 | time.sleep(5) | |
93 | ||
94 | def mdlog_autotrim(zone): | |
95 | zone.cluster.admin(['mdlog', 'autotrim']) | |
96 | ||
97 | def parse_meta_sync_status(meta_sync_status_json): | |
98 | meta_sync_status_json = meta_sync_status_json.decode('utf-8') | |
99 | log.debug('current meta sync status=%s', meta_sync_status_json) | |
100 | sync_status = json.loads(meta_sync_status_json) | |
101 | ||
102 | sync_info = sync_status['sync_status']['info'] | |
103 | global_sync_status = sync_info['status'] | |
104 | num_shards = sync_info['num_shards'] | |
105 | period = sync_info['period'] | |
106 | realm_epoch = sync_info['realm_epoch'] | |
107 | ||
108 | sync_markers=sync_status['sync_status']['markers'] | |
109 | log.debug('sync_markers=%s', sync_markers) | |
110 | assert(num_shards == len(sync_markers)) | |
111 | ||
112 | markers={} | |
113 | for i in range(num_shards): | |
114 | # get marker, only if it's an incremental marker for the same realm epoch | |
115 | if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0: | |
116 | markers[i] = '' | |
117 | else: | |
118 | markers[i] = sync_markers[i]['val']['marker'] | |
119 | ||
120 | return period, realm_epoch, num_shards, markers | |
121 | ||
122 | def meta_sync_status(zone): | |
123 | for _ in range(config.checkpoint_retries): | |
124 | cmd = ['metadata', 'sync', 'status'] + zone.zone_args() | |
125 | meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True) | |
126 | if retcode == 0: | |
127 | return parse_meta_sync_status(meta_sync_status_json) | |
128 | assert(retcode == 2) # ENOENT | |
129 | time.sleep(config.checkpoint_delay) | |
130 | ||
131 | assert False, 'failed to read metadata sync status for zone=%s' % zone.name | |
132 | ||
133 | def meta_master_log_status(master_zone): | |
134 | cmd = ['mdlog', 'status'] + master_zone.zone_args() | |
135 | mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True) | |
136 | mdlog_status = json.loads(mdlog_status_json.decode('utf-8')) | |
137 | ||
138 | markers = {i: s['marker'] for i, s in enumerate(mdlog_status)} | |
139 | log.debug('master meta markers=%s', markers) | |
140 | return markers | |
141 | ||
142 | def compare_meta_status(zone, log_status, sync_status): | |
143 | if len(log_status) != len(sync_status): | |
144 | log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status)) | |
145 | return False | |
146 | ||
147 | msg = '' | |
148 | for i, l, s in zip(log_status, log_status.values(), sync_status.values()): | |
149 | if l > s: | |
150 | if len(msg): | |
151 | msg += ', ' | |
152 | msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s | |
153 | ||
154 | if len(msg) > 0: | |
155 | log.warning('zone %s behind master: %s', zone.name, msg) | |
156 | return False | |
157 | ||
158 | return True | |
159 | ||
160 | def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None): | |
161 | if not meta_master_zone: | |
162 | meta_master_zone = zone.realm().meta_master_zone() | |
163 | if not master_status: | |
164 | master_status = meta_master_log_status(meta_master_zone) | |
165 | ||
166 | current_realm_epoch = realm.current_period.data['realm_epoch'] | |
167 | ||
168 | log.info('starting meta checkpoint for zone=%s', zone.name) | |
169 | ||
170 | for _ in range(config.checkpoint_retries): | |
171 | period, realm_epoch, num_shards, sync_status = meta_sync_status(zone) | |
172 | if realm_epoch < current_realm_epoch: | |
173 | log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d', | |
174 | zone.name, realm_epoch, current_realm_epoch) | |
175 | else: | |
176 | log.debug('log_status=%s', master_status) | |
177 | log.debug('sync_status=%s', sync_status) | |
178 | if compare_meta_status(zone, master_status, sync_status): | |
179 | log.info('finish meta checkpoint for zone=%s', zone.name) | |
180 | return | |
181 | ||
182 | time.sleep(config.checkpoint_delay) | |
183 | assert False, 'failed meta checkpoint for zone=%s' % zone.name | |
184 | ||
185 | def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None): | |
186 | if not meta_master_zone: | |
187 | meta_master_zone = zonegroup.realm().meta_master_zone() | |
188 | if not master_status: | |
189 | master_status = meta_master_log_status(meta_master_zone) | |
190 | ||
191 | for zone in zonegroup.zones: | |
192 | if zone == meta_master_zone: | |
193 | continue | |
194 | zone_meta_checkpoint(zone, meta_master_zone, master_status) | |
195 | ||
196 | def realm_meta_checkpoint(realm): | |
197 | log.info('meta checkpoint') | |
198 | ||
199 | meta_master_zone = realm.meta_master_zone() | |
200 | master_status = meta_master_log_status(meta_master_zone) | |
201 | ||
202 | for zonegroup in realm.current_period.zonegroups: | |
203 | zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status) | |
204 | ||
205 | def parse_data_sync_status(data_sync_status_json): | |
206 | data_sync_status_json = data_sync_status_json.decode('utf-8') | |
207 | log.debug('current data sync status=%s', data_sync_status_json) | |
208 | sync_status = json.loads(data_sync_status_json) | |
209 | ||
210 | global_sync_status=sync_status['sync_status']['info']['status'] | |
211 | num_shards=sync_status['sync_status']['info']['num_shards'] | |
212 | ||
213 | sync_markers=sync_status['sync_status']['markers'] | |
214 | log.debug('sync_markers=%s', sync_markers) | |
215 | assert(num_shards == len(sync_markers)) | |
216 | ||
217 | markers={} | |
218 | for i in range(num_shards): | |
219 | markers[i] = sync_markers[i]['val']['marker'] | |
220 | ||
221 | return (num_shards, markers) | |
222 | ||
223 | def data_sync_status(target_zone, source_zone): | |
224 | if target_zone == source_zone: | |
225 | return None | |
226 | ||
227 | for _ in range(config.checkpoint_retries): | |
228 | cmd = ['data', 'sync', 'status'] + target_zone.zone_args() | |
229 | cmd += ['--source-zone', source_zone.name] | |
230 | data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) | |
231 | if retcode == 0: | |
232 | return parse_data_sync_status(data_sync_status_json) | |
233 | ||
234 | assert(retcode == 2) # ENOENT | |
235 | time.sleep(config.checkpoint_delay) | |
236 | ||
237 | assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \ | |
238 | (target_zone.name, source_zone.name) | |
239 | ||
240 | def bucket_sync_status(target_zone, source_zone, bucket_name): | |
241 | if target_zone == source_zone: | |
242 | return None | |
243 | ||
244 | cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args() | |
245 | cmd += ['--source-zone', source_zone.name] | |
246 | cmd += ['--bucket', bucket_name] | |
247 | while True: | |
248 | bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) | |
249 | if retcode == 0: | |
250 | break | |
251 | ||
252 | assert(retcode == 2) # ENOENT | |
253 | ||
254 | bucket_sync_status_json = bucket_sync_status_json.decode('utf-8') | |
255 | log.debug('current bucket sync status=%s', bucket_sync_status_json) | |
256 | sync_status = json.loads(bucket_sync_status_json) | |
257 | ||
258 | markers={} | |
259 | for entry in sync_status: | |
260 | val = entry['val'] | |
261 | if val['status'] == 'incremental-sync': | |
262 | pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3 | |
263 | else: | |
264 | pos = '' | |
265 | markers[entry['key']] = pos | |
266 | ||
267 | return markers | |
268 | ||
269 | def data_source_log_status(source_zone): | |
270 | source_cluster = source_zone.cluster | |
271 | cmd = ['datalog', 'status'] + source_zone.zone_args() | |
272 | datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True) | |
273 | datalog_status = json.loads(datalog_status_json.decode('utf-8')) | |
274 | ||
275 | markers = {i: s['marker'] for i, s in enumerate(datalog_status)} | |
276 | log.debug('data markers for zone=%s markers=%s', source_zone.name, markers) | |
277 | return markers | |
278 | ||
279 | def bucket_source_log_status(source_zone, bucket_name): | |
280 | cmd = ['bilog', 'status'] + source_zone.zone_args() | |
281 | cmd += ['--bucket', bucket_name] | |
282 | source_cluster = source_zone.cluster | |
283 | bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True) | |
284 | bilog_status = json.loads(bilog_status_json.decode('utf-8')) | |
285 | ||
286 | m={} | |
287 | markers={} | |
288 | try: | |
289 | m = bilog_status['markers'] | |
290 | except: | |
291 | pass | |
292 | ||
293 | for s in m: | |
294 | key = s['key'] | |
295 | val = s['val'] | |
296 | markers[key] = val | |
297 | ||
298 | log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers) | |
299 | return markers | |
300 | ||
301 | def compare_data_status(target_zone, source_zone, log_status, sync_status): | |
302 | if len(log_status) != len(sync_status): | |
303 | log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status)) | |
304 | return False | |
305 | ||
306 | msg = '' | |
307 | for i, l, s in zip(log_status, log_status.values(), sync_status.values()): | |
308 | if l > s: | |
309 | if len(msg): | |
310 | msg += ', ' | |
311 | msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s | |
312 | ||
313 | if len(msg) > 0: | |
314 | log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg) | |
315 | return False | |
316 | ||
317 | return True | |
318 | ||
319 | def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): | |
320 | if len(log_status) != len(sync_status): | |
321 | log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status)) | |
322 | return False | |
323 | ||
324 | msg = '' | |
325 | for i, l, s in zip(log_status, log_status.values(), sync_status.values()): | |
326 | if l > s: | |
327 | if len(msg): | |
328 | msg += ', ' | |
329 | msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s | |
330 | ||
331 | if len(msg) > 0: | |
332 | log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg) | |
333 | return False | |
334 | ||
335 | return True | |
336 | ||
337 | def zone_data_checkpoint(target_zone, source_zone_conn): | |
338 | if target_zone == source_zone: | |
339 | return | |
340 | ||
341 | log_status = data_source_log_status(source_zone) | |
342 | log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name) | |
343 | ||
344 | for _ in range(config.checkpoint_retries): | |
345 | num_shards, sync_status = data_sync_status(target_zone, source_zone) | |
346 | ||
347 | log.debug('log_status=%s', log_status) | |
348 | log.debug('sync_status=%s', sync_status) | |
349 | ||
350 | if compare_data_status(target_zone, source_zone, log_status, sync_status): | |
351 | log.info('finished data checkpoint for target_zone=%s source_zone=%s', | |
352 | target_zone.name, source_zone.name) | |
353 | return | |
354 | time.sleep(config.checkpoint_delay) | |
355 | ||
356 | assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \ | |
357 | (target_zone.name, source_zone.name) | |
358 | ||
359 | ||
360 | def zone_bucket_checkpoint(target_zone, source_zone, bucket_name): | |
361 | if target_zone == source_zone: | |
362 | return | |
363 | ||
364 | log_status = bucket_source_log_status(source_zone, bucket_name) | |
365 | log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name) | |
366 | ||
367 | for _ in range(config.checkpoint_retries): | |
368 | sync_status = bucket_sync_status(target_zone, source_zone, bucket_name) | |
369 | ||
370 | log.debug('log_status=%s', log_status) | |
371 | log.debug('sync_status=%s', sync_status) | |
372 | ||
373 | if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): | |
374 | log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name) | |
375 | return | |
376 | ||
377 | time.sleep(config.checkpoint_delay) | |
378 | ||
379 | assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \ | |
380 | (target_zone.name, source_zone.name, bucket_name) | |
381 | ||
382 | def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name): | |
383 | for source_conn in zonegroup_conns.zones: | |
384 | for target_conn in zonegroup_conns.zones: | |
385 | if source_conn.zone == target_conn.zone: | |
386 | continue | |
387 | zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name) | |
388 | target_conn.check_bucket_eq(source_conn, bucket_name) | |
389 | ||
390 | def set_master_zone(zone): | |
391 | zone.modify(zone.cluster, ['--master']) | |
392 | zonegroup = zone.zonegroup | |
393 | zonegroup.period.update(zone, commit=True) | |
394 | zonegroup.master_zone = zone | |
395 | log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay) | |
396 | time.sleep(config.reconfigure_delay) | |
397 | ||
398 | def enable_bucket_sync(zone, bucket_name): | |
399 | cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args() | |
400 | zone.cluster.admin(cmd) | |
401 | ||
402 | def disable_bucket_sync(zone, bucket_name): | |
403 | cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args() | |
404 | zone.cluster.admin(cmd) | |
405 | ||
406 | def check_buckets_sync_status_obj_not_exist(zone, buckets): | |
407 | for _ in range(config.checkpoint_retries): | |
408 | cmd = ['log', 'list'] + zone.zone_arg() | |
409 | log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True) | |
410 | for bucket in buckets: | |
411 | if log_list.find(':'+bucket+":") >= 0: | |
412 | break | |
413 | else: | |
414 | return | |
415 | time.sleep(config.checkpoint_delay) | |
416 | assert False | |
417 | ||
418 | def gen_bucket_name(): | |
419 | global num_buckets | |
420 | ||
421 | num_buckets += 1 | |
422 | return run_prefix + '-' + str(num_buckets) | |
423 | ||
424 | class ZonegroupConns: | |
425 | def __init__(self, zonegroup): | |
426 | self.zonegroup = zonegroup | |
427 | self.zones = [] | |
428 | self.ro_zones = [] | |
429 | self.rw_zones = [] | |
430 | self.master_zone = None | |
431 | for z in zonegroup.zones: | |
432 | zone_conn = z.get_conn(user.credentials) | |
433 | self.zones.append(zone_conn) | |
434 | if z.is_read_only(): | |
435 | self.ro_zones.append(zone_conn) | |
436 | else: | |
437 | self.rw_zones.append(zone_conn) | |
438 | ||
439 | if z == zonegroup.master_zone: | |
440 | self.master_zone = zone_conn | |
441 | ||
442 | def check_all_buckets_exist(zone_conn, buckets): | |
443 | if not zone_conn.zone.has_buckets(): | |
444 | return True | |
445 | ||
446 | for b in buckets: | |
447 | try: | |
448 | zone_conn.get_bucket(b) | |
449 | except: | |
450 | log.critical('zone %s does not contain bucket %s', zone.name, b) | |
451 | return False | |
452 | ||
453 | return True | |
454 | ||
455 | def check_all_buckets_dont_exist(zone_conn, buckets): | |
456 | if not zone_conn.zone.has_buckets(): | |
457 | return True | |
458 | ||
459 | for b in buckets: | |
460 | try: | |
461 | zone_conn.get_bucket(b) | |
462 | except: | |
463 | continue | |
464 | ||
465 | log.critical('zone %s contains bucket %s', zone.zone, b) | |
466 | return False | |
467 | ||
468 | return True | |
469 | ||
470 | def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1): | |
471 | buckets = [] | |
472 | zone_bucket = [] | |
473 | for zone in zonegroup_conns.rw_zones: | |
474 | for i in xrange(buckets_per_zone): | |
475 | bucket_name = gen_bucket_name() | |
476 | log.info('create bucket zone=%s name=%s', zone.name, bucket_name) | |
477 | bucket = zone.create_bucket(bucket_name) | |
478 | buckets.append(bucket_name) | |
479 | zone_bucket.append((zone, bucket)) | |
480 | ||
481 | return buckets, zone_bucket | |
482 | ||
483 | def create_bucket_per_zone_in_realm(): | |
484 | buckets = [] | |
485 | zone_bucket = [] | |
486 | for zonegroup in realm.current_period.zonegroups: | |
487 | zg_conn = ZonegroupConns(zonegroup) | |
488 | b, z = create_bucket_per_zone(zg_conn) | |
489 | buckets.extend(b) | |
490 | zone_bucket.extend(z) | |
491 | return buckets, zone_bucket | |
492 | ||
493 | def test_bucket_create(): | |
494 | zonegroup = realm.master_zonegroup() | |
495 | zonegroup_conns = ZonegroupConns(zonegroup) | |
496 | buckets, _ = create_bucket_per_zone(zonegroup_conns) | |
497 | zonegroup_meta_checkpoint(zonegroup) | |
498 | ||
499 | for zone in zonegroup_conns.zones: | |
500 | assert check_all_buckets_exist(zone, buckets) | |
501 | ||
502 | def test_bucket_recreate(): | |
503 | zonegroup = realm.master_zonegroup() | |
504 | zonegroup_conns = ZonegroupConns(zonegroup) | |
505 | buckets, _ = create_bucket_per_zone(zonegroup_conns) | |
506 | zonegroup_meta_checkpoint(zonegroup) | |
507 | ||
508 | ||
509 | for zone in zonegroup_conns.zones: | |
510 | assert check_all_buckets_exist(zone, buckets) | |
511 | ||
512 | # recreate buckets on all zones, make sure they weren't removed | |
513 | for zone in zonegroup_conns.rw_zones: | |
514 | for bucket_name in buckets: | |
515 | bucket = zone.create_bucket(bucket_name) | |
516 | ||
517 | for zone in zonegroup_conns.zones: | |
518 | assert check_all_buckets_exist(zone, buckets) | |
519 | ||
520 | zonegroup_meta_checkpoint(zonegroup) | |
521 | ||
522 | for zone in zonegroup_conns.zones: | |
523 | assert check_all_buckets_exist(zone, buckets) | |
524 | ||
525 | def test_bucket_remove(): | |
526 | zonegroup = realm.master_zonegroup() | |
527 | zonegroup_conns = ZonegroupConns(zonegroup) | |
528 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
529 | zonegroup_meta_checkpoint(zonegroup) | |
530 | ||
531 | for zone in zonegroup_conns.zones: | |
532 | assert check_all_buckets_exist(zone, buckets) | |
533 | ||
534 | for zone, bucket_name in zone_bucket: | |
535 | zone.conn.delete_bucket(bucket_name) | |
536 | ||
537 | zonegroup_meta_checkpoint(zonegroup) | |
538 | ||
539 | for zone in zonegroup_conns.zones: | |
540 | assert check_all_buckets_dont_exist(zone, buckets) | |
541 | ||
542 | def get_bucket(zone, bucket_name): | |
543 | return zone.conn.get_bucket(bucket_name) | |
544 | ||
545 | def get_key(zone, bucket_name, obj_name): | |
546 | b = get_bucket(zone, bucket_name) | |
547 | return b.get_key(obj_name) | |
548 | ||
549 | def new_key(zone, bucket_name, obj_name): | |
550 | b = get_bucket(zone, bucket_name) | |
551 | return b.new_key(obj_name) | |
552 | ||
553 | def check_bucket_eq(zone_conn1, zone_conn2, bucket): | |
554 | return zone_conn2.check_bucket_eq(zone_conn1, bucket.name) | |
555 | ||
556 | def test_object_sync(): | |
557 | zonegroup = realm.master_zonegroup() | |
558 | zonegroup_conns = ZonegroupConns(zonegroup) | |
559 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
560 | ||
561 | objnames = [ 'myobj', '_myobj', ':', '&' ] | |
562 | content = 'asdasd' | |
563 | ||
564 | # don't wait for meta sync just yet | |
565 | for zone, bucket_name in zone_bucket: | |
566 | for objname in objnames: | |
567 | k = new_key(zone, bucket_name, objname) | |
568 | k.set_contents_from_string(content) | |
569 | ||
570 | zonegroup_meta_checkpoint(zonegroup) | |
571 | ||
572 | for source_conn, bucket in zone_bucket: | |
573 | for target_conn in zonegroup_conns.zones: | |
574 | if source_conn.zone == target_conn.zone: | |
575 | continue | |
576 | ||
577 | zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) | |
578 | check_bucket_eq(source_conn, target_conn, bucket) | |
579 | ||
580 | def test_object_delete(): | |
581 | zonegroup = realm.master_zonegroup() | |
582 | zonegroup_conns = ZonegroupConns(zonegroup) | |
583 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
584 | ||
585 | objname = 'myobj' | |
586 | content = 'asdasd' | |
587 | ||
588 | # don't wait for meta sync just yet | |
589 | for zone, bucket in zone_bucket: | |
590 | k = new_key(zone, bucket, objname) | |
591 | k.set_contents_from_string(content) | |
592 | ||
593 | zonegroup_meta_checkpoint(zonegroup) | |
594 | ||
595 | # check object exists | |
596 | for source_conn, bucket in zone_bucket: | |
597 | for target_conn in zonegroup_conns.zones: | |
598 | if source_conn.zone == target_conn.zone: | |
599 | continue | |
600 | ||
601 | zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) | |
602 | check_bucket_eq(source_conn, target_conn, bucket) | |
603 | ||
604 | # check object removal | |
605 | for source_conn, bucket in zone_bucket: | |
606 | k = get_key(source_conn, bucket, objname) | |
607 | k.delete() | |
608 | for target_conn in zonegroup_conns.zones: | |
609 | if source_conn.zone == target_conn.zone: | |
610 | continue | |
611 | ||
612 | zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) | |
613 | check_bucket_eq(source_conn, target_conn, bucket) | |
614 | ||
615 | def get_latest_object_version(key): | |
616 | for k in key.bucket.list_versions(key.name): | |
617 | if k.is_latest: | |
618 | return k | |
619 | return None | |
620 | ||
621 | def test_versioned_object_incremental_sync(): | |
622 | zonegroup = realm.master_zonegroup() | |
623 | zonegroup_conns = ZonegroupConns(zonegroup) | |
624 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
625 | ||
626 | # enable versioning | |
627 | for _, bucket in zone_bucket: | |
628 | bucket.configure_versioning(True) | |
629 | ||
630 | zonegroup_meta_checkpoint(zonegroup) | |
631 | ||
632 | # upload a dummy object to each bucket and wait for sync. this forces each | |
633 | # bucket to finish a full sync and switch to incremental | |
634 | for source_conn, bucket in zone_bucket: | |
635 | new_key(source_conn, bucket, 'dummy').set_contents_from_string('') | |
636 | for target_conn in zonegroup_conns.zones: | |
637 | if source_conn.zone == target_conn.zone: | |
638 | continue | |
639 | zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) | |
640 | ||
641 | for _, bucket in zone_bucket: | |
642 | # create and delete multiple versions of an object from each zone | |
643 | for zone_conn in zonegroup_conns.rw_zones: | |
644 | obj = 'obj-' + zone_conn.name | |
645 | k = new_key(zone_conn, bucket, obj) | |
646 | ||
647 | k.set_contents_from_string('version1') | |
648 | v = get_latest_object_version(k) | |
649 | log.debug('version1 id=%s', v.version_id) | |
650 | # don't delete version1 - this tests that the initial version | |
651 | # doesn't get squashed into later versions | |
652 | ||
653 | # create and delete the following object versions to test that | |
654 | # the operations don't race with each other during sync | |
655 | k.set_contents_from_string('version2') | |
656 | v = get_latest_object_version(k) | |
657 | log.debug('version2 id=%s', v.version_id) | |
658 | k.bucket.delete_key(obj, version_id=v.version_id) | |
659 | ||
660 | k.set_contents_from_string('version3') | |
661 | v = get_latest_object_version(k) | |
662 | log.debug('version3 id=%s', v.version_id) | |
663 | k.bucket.delete_key(obj, version_id=v.version_id) | |
664 | ||
665 | for source_conn, bucket in zone_bucket: | |
666 | for target_conn in zonegroup_conns.zones: | |
667 | if source_conn.zone == target_conn.zone: | |
668 | continue | |
669 | zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) | |
670 | check_bucket_eq(source_conn, target_conn, bucket) | |
671 | ||
672 | def test_bucket_versioning(): | |
673 | buckets, zone_bucket = create_bucket_per_zone_in_realm() | |
674 | for _, bucket in zone_bucket: | |
675 | bucket.configure_versioning(True) | |
676 | res = bucket.get_versioning_status() | |
677 | key = 'Versioning' | |
678 | assert(key in res and res[key] == 'Enabled') | |
679 | ||
680 | def test_bucket_acl(): | |
681 | buckets, zone_bucket = create_bucket_per_zone_in_realm() | |
682 | for _, bucket in zone_bucket: | |
683 | assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner | |
684 | bucket.set_acl('public-read') | |
685 | assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers | |
686 | ||
687 | def test_bucket_cors(): | |
688 | buckets, zone_bucket = create_bucket_per_zone_in_realm() | |
689 | for _, bucket in zone_bucket: | |
690 | cors_cfg = CORSConfiguration() | |
691 | cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000) | |
692 | bucket.set_cors(cors_cfg) | |
693 | assert(bucket.get_cors().to_xml() == cors_cfg.to_xml()) | |
694 | ||
695 | def test_bucket_delete_notempty(): | |
696 | zonegroup = realm.master_zonegroup() | |
697 | zonegroup_conns = ZonegroupConns(zonegroup) | |
698 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
699 | zonegroup_meta_checkpoint(zonegroup) | |
700 | ||
701 | for zone_conn, bucket_name in zone_bucket: | |
702 | # upload an object to each bucket on its own zone | |
703 | conn = zone_conn.get_connection() | |
704 | bucket = conn.get_bucket(bucket_name) | |
705 | k = bucket.new_key('foo') | |
706 | k.set_contents_from_string('bar') | |
707 | # attempt to delete the bucket before this object can sync | |
708 | try: | |
709 | conn.delete_bucket(bucket_name) | |
710 | except boto.exception.S3ResponseError as e: | |
711 | assert(e.error_code == 'BucketNotEmpty') | |
712 | continue | |
713 | assert False # expected 409 BucketNotEmpty | |
714 | ||
715 | # assert that each bucket still exists on the master | |
716 | c1 = zonegroup_conns.master_zone.conn | |
717 | for _, bucket_name in zone_bucket: | |
718 | assert c1.get_bucket(bucket_name) | |
719 | ||
720 | def test_multi_period_incremental_sync(): | |
721 | zonegroup = realm.master_zonegroup() | |
722 | if len(zonegroup.zones) < 3: | |
723 | raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.") | |
724 | ||
725 | # periods to include in mdlog comparison | |
726 | mdlog_periods = [realm.current_period.id] | |
727 | ||
728 | # create a bucket in each zone | |
729 | zonegroup_conns = ZonegroupConns(zonegroup) | |
730 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
731 | ||
732 | zonegroup_meta_checkpoint(zonegroup) | |
733 | ||
734 | z1, z2, z3 = zonegroup.zones[0:3] | |
735 | assert(z1 == zonegroup.master_zone) | |
736 | ||
737 | # kill zone 3 gateways to freeze sync status to incremental in first period | |
738 | z3.stop() | |
739 | ||
740 | # change master to zone 2 -> period 2 | |
741 | set_master_zone(z2) | |
742 | mdlog_periods += [realm.current_period.id] | |
743 | ||
744 | for zone_conn, _ in zone_bucket: | |
745 | if zone_conn.zone == z3: | |
746 | continue | |
747 | bucket_name = gen_bucket_name() | |
748 | log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) | |
749 | bucket = zone_conn.conn.create_bucket(bucket_name) | |
750 | buckets.append(bucket_name) | |
751 | ||
752 | # wait for zone 1 to sync | |
753 | zone_meta_checkpoint(z1) | |
754 | ||
755 | # change master back to zone 1 -> period 3 | |
756 | set_master_zone(z1) | |
757 | mdlog_periods += [realm.current_period.id] | |
758 | ||
759 | for zone_conn, bucket_name in zone_bucket: | |
760 | if zone_conn.zone == z3: | |
761 | continue | |
762 | bucket_name = gen_bucket_name() | |
763 | log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) | |
764 | bucket = zone_conn.conn.create_bucket(bucket_name) | |
765 | buckets.append(bucket_name) | |
766 | ||
767 | # restart zone 3 gateway and wait for sync | |
768 | z3.start() | |
769 | zonegroup_meta_checkpoint(zonegroup) | |
770 | ||
771 | # verify that we end up with the same objects | |
772 | for bucket_name in buckets: | |
773 | for source_conn, _ in zone_bucket: | |
774 | for target_conn in zonegroup_conns.zones: | |
775 | if source_conn.zone == target_conn.zone: | |
776 | continue | |
777 | ||
778 | target_conn.check_bucket_eq(source_conn, bucket_name) | |
779 | ||
780 | # verify that mdlogs are not empty and match for each period | |
781 | for period in mdlog_periods: | |
782 | master_mdlog = mdlog_list(z1, period) | |
783 | assert len(master_mdlog) > 0 | |
784 | for zone in zonegroup.zones: | |
785 | if zone == z1: | |
786 | continue | |
787 | mdlog = mdlog_list(zone, period) | |
788 | assert len(mdlog) == len(master_mdlog) | |
789 | ||
790 | # autotrim mdlogs for master zone | |
791 | mdlog_autotrim(z1) | |
792 | ||
793 | # autotrim mdlogs for peers | |
794 | for zone in zonegroup.zones: | |
795 | if zone == z1: | |
796 | continue | |
797 | mdlog_autotrim(zone) | |
798 | ||
799 | # verify that mdlogs are empty for each period | |
800 | for period in mdlog_periods: | |
801 | for zone in zonegroup.zones: | |
802 | mdlog = mdlog_list(zone, period) | |
803 | assert len(mdlog) == 0 | |
804 | ||
805 | def test_zonegroup_remove(): | |
806 | zonegroup = realm.master_zonegroup() | |
807 | zonegroup_conns = ZonegroupConns(zonegroup) | |
808 | if len(zonegroup.zones) < 2: | |
809 | raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.") | |
810 | ||
811 | zonegroup_meta_checkpoint(zonegroup) | |
812 | z1, z2 = zonegroup.zones[0:2] | |
813 | c1, c2 = (z1.cluster, z2.cluster) | |
814 | ||
815 | # create a new zone in zonegroup on c2 and commit | |
816 | zone = Zone('remove', zonegroup, c2) | |
817 | zone.create(c2) | |
818 | zonegroup.zones.append(zone) | |
819 | zonegroup.period.update(zone, commit=True) | |
820 | ||
821 | zonegroup.remove(c1, zone) | |
822 | ||
823 | # another 'zonegroup remove' should fail with ENOENT | |
824 | _, retcode = zonegroup.remove(c1, zone, check_retcode=False) | |
825 | assert(retcode == 2) # ENOENT | |
826 | ||
827 | # delete the new zone | |
828 | zone.delete(c2) | |
829 | ||
830 | # validate the resulting period | |
831 | zonegroup.period.update(z1, commit=True) | |
832 | ||
833 | def test_set_bucket_website(): | |
834 | buckets, zone_bucket = create_bucket_per_zone_in_realm() | |
835 | for _, bucket in zone_bucket: | |
836 | website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html') | |
837 | try: | |
838 | bucket.set_website_configuration(website_cfg) | |
839 | except boto.exception.S3ResponseError as e: | |
840 | if e.error_code == 'MethodNotAllowed': | |
841 | raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.") | |
842 | assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml()) | |
843 | ||
844 | def test_set_bucket_policy(): | |
845 | policy = '''{ | |
846 | "Version": "2012-10-17", | |
847 | "Statement": [{ | |
848 | "Effect": "Allow", | |
849 | "Principal": "*" | |
850 | }] | |
851 | }''' | |
852 | buckets, zone_bucket = create_bucket_per_zone_in_realm() | |
853 | for _, bucket in zone_bucket: | |
854 | bucket.set_policy(policy) | |
855 | assert(bucket.get_policy() == policy) | |
856 | ||
857 | def test_bucket_sync_disable(): | |
858 | zonegroup = realm.master_zonegroup() | |
859 | zonegroup_conns = ZonegroupConns(zonegroup) | |
860 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
861 | ||
862 | for bucket_name in buckets: | |
863 | disable_bucket_sync(realm.meta_master_zone(), bucket_name) | |
864 | ||
865 | for zone in zonegroup.zones: | |
866 | check_buckets_sync_status_obj_not_exist(zone, buckets) | |
867 | ||
868 | def test_bucket_sync_enable_right_after_disable(): | |
869 | zonegroup = realm.master_zonegroup() | |
870 | zonegroup_conns = ZonegroupConns(zonegroup) | |
871 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
872 | ||
873 | objnames = ['obj1', 'obj2', 'obj3', 'obj4'] | |
874 | content = 'asdasd' | |
875 | ||
876 | for zone, bucket in zone_bucket: | |
877 | for objname in objnames: | |
878 | k = new_key(zone, bucket.name, objname) | |
879 | k.set_contents_from_string(content) | |
880 | ||
881 | for bucket_name in buckets: | |
882 | zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) | |
883 | ||
884 | for bucket_name in buckets: | |
885 | disable_bucket_sync(realm.meta_master_zone(), bucket_name) | |
886 | enable_bucket_sync(realm.meta_master_zone(), bucket_name) | |
887 | ||
888 | objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8'] | |
889 | ||
890 | for zone, bucket in zone_bucket: | |
891 | for objname in objnames_2: | |
892 | k = new_key(zone, bucket.name, objname) | |
893 | k.set_contents_from_string(content) | |
894 | ||
895 | for bucket_name in buckets: | |
896 | zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) | |
897 | ||
898 | def test_bucket_sync_disable_enable(): | |
899 | zonegroup = realm.master_zonegroup() | |
900 | zonegroup_conns = ZonegroupConns(zonegroup) | |
901 | buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) | |
902 | ||
903 | objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ] | |
904 | content = 'asdasd' | |
905 | ||
906 | for zone, bucket in zone_bucket: | |
907 | for objname in objnames: | |
908 | k = new_key(zone, bucket.name, objname) | |
909 | k.set_contents_from_string(content) | |
910 | ||
911 | zonegroup_meta_checkpoint(zonegroup) | |
912 | ||
913 | for bucket_name in buckets: | |
914 | zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) | |
915 | ||
916 | for bucket_name in buckets: | |
917 | disable_bucket_sync(realm.meta_master_zone(), bucket_name) | |
918 | ||
919 | zonegroup_meta_checkpoint(zonegroup) | |
920 | ||
921 | objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ] | |
922 | ||
923 | for zone, bucket in zone_bucket: | |
924 | for objname in objnames_2: | |
925 | k = new_key(zone, bucket.name, objname) | |
926 | k.set_contents_from_string(content) | |
927 | ||
928 | for bucket_name in buckets: | |
929 | enable_bucket_sync(realm.meta_master_zone(), bucket_name) | |
930 | ||
931 | for bucket_name in buckets: | |
932 | zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) |