]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
update sources to v12.2.3
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7
8 try:
9 from itertools import izip_longest as zip_longest
10 except ImportError:
11 from itertools import zip_longest
12 from itertools import combinations
13 from cStringIO import StringIO
14
15 import boto
16 import boto.s3.connection
17 from boto.s3.website import WebsiteConfiguration
18 from boto.s3.cors import CORSConfiguration
19
20 from nose.tools import eq_ as eq
21 from nose.plugins.attrib import attr
22 from nose.plugins.skip import SkipTest
23
24 from .multisite import Zone
25
26 from .conn import get_gateway_connection
27
28 class Config:
29 """ test configuration """
30 def __init__(self, **kwargs):
31 # by default, wait up to 5 minutes before giving up on a sync checkpoint
32 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
33 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
34 # allow some time for realm reconfiguration after changing master zone
35 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
36
37 # rgw multisite tests, written against the interfaces provided in rgw_multi.
38 # these tests must be initialized and run by another module that provides
39 # implementations of these interfaces by calling init_multi()
40 realm = None
41 user = None
42 config = None
43 def init_multi(_realm, _user, _config=None):
44 global realm
45 realm = _realm
46 global user
47 user = _user
48 global config
49 config = _config or Config()
50 realm_meta_checkpoint(realm)
51
52 def get_realm():
53 return realm
54
55 log = logging.getLogger(__name__)
56
57 num_buckets = 0
58 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
59
60 def get_gateway_connection(gateway, credentials):
61 """ connect to the given gateway """
62 if gateway.connection is None:
63 gateway.connection = boto.connect_s3(
64 aws_access_key_id = credentials.access_key,
65 aws_secret_access_key = credentials.secret,
66 host = gateway.host,
67 port = gateway.port,
68 is_secure = False,
69 calling_format = boto.s3.connection.OrdinaryCallingFormat())
70 return gateway.connection
71
72 def get_zone_connection(zone, credentials):
73 """ connect to the zone's first gateway """
74 if isinstance(credentials, list):
75 credentials = credentials[0]
76 return get_gateway_connection(zone.gateways[0], credentials)
77
78 def mdlog_list(zone, period = None):
79 cmd = ['mdlog', 'list']
80 if period:
81 cmd += ['--period', period]
82 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
83 mdlog_json = mdlog_json.decode('utf-8')
84 return json.loads(mdlog_json)
85
86 def meta_sync_status(zone):
87 while True:
88 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
89 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
90 if retcode == 0:
91 break
92 assert(retcode == 2) # ENOENT
93 time.sleep(5)
94
95 def mdlog_autotrim(zone):
96 zone.cluster.admin(['mdlog', 'autotrim'])
97
98 def bilog_list(zone, bucket, args = None):
99 cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
100 bilog, _ = zone.cluster.admin(cmd, read_only=True)
101 bilog = bilog.decode('utf-8')
102 return json.loads(bilog)
103
104 def bilog_autotrim(zone, args = None):
105 zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
106
107 def parse_meta_sync_status(meta_sync_status_json):
108 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
109 log.debug('current meta sync status=%s', meta_sync_status_json)
110 sync_status = json.loads(meta_sync_status_json)
111
112 sync_info = sync_status['sync_status']['info']
113 global_sync_status = sync_info['status']
114 num_shards = sync_info['num_shards']
115 period = sync_info['period']
116 realm_epoch = sync_info['realm_epoch']
117
118 sync_markers=sync_status['sync_status']['markers']
119 log.debug('sync_markers=%s', sync_markers)
120 assert(num_shards == len(sync_markers))
121
122 markers={}
123 for i in range(num_shards):
124 # get marker, only if it's an incremental marker for the same realm epoch
125 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
126 markers[i] = ''
127 else:
128 markers[i] = sync_markers[i]['val']['marker']
129
130 return period, realm_epoch, num_shards, markers
131
132 def meta_sync_status(zone):
133 for _ in range(config.checkpoint_retries):
134 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
135 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
136 if retcode == 0:
137 return parse_meta_sync_status(meta_sync_status_json)
138 assert(retcode == 2) # ENOENT
139 time.sleep(config.checkpoint_delay)
140
141 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
142
143 def meta_master_log_status(master_zone):
144 cmd = ['mdlog', 'status'] + master_zone.zone_args()
145 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
146 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
147
148 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
149 log.debug('master meta markers=%s', markers)
150 return markers
151
152 def compare_meta_status(zone, log_status, sync_status):
153 if len(log_status) != len(sync_status):
154 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
155 return False
156
157 msg = ''
158 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
159 if l > s:
160 if len(msg):
161 msg += ', '
162 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
163
164 if len(msg) > 0:
165 log.warning('zone %s behind master: %s', zone.name, msg)
166 return False
167
168 return True
169
170 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
171 if not meta_master_zone:
172 meta_master_zone = zone.realm().meta_master_zone()
173 if not master_status:
174 master_status = meta_master_log_status(meta_master_zone)
175
176 current_realm_epoch = realm.current_period.data['realm_epoch']
177
178 log.info('starting meta checkpoint for zone=%s', zone.name)
179
180 for _ in range(config.checkpoint_retries):
181 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
182 if realm_epoch < current_realm_epoch:
183 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
184 zone.name, realm_epoch, current_realm_epoch)
185 else:
186 log.debug('log_status=%s', master_status)
187 log.debug('sync_status=%s', sync_status)
188 if compare_meta_status(zone, master_status, sync_status):
189 log.info('finish meta checkpoint for zone=%s', zone.name)
190 return
191
192 time.sleep(config.checkpoint_delay)
193 assert False, 'failed meta checkpoint for zone=%s' % zone.name
194
195 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
196 if not meta_master_zone:
197 meta_master_zone = zonegroup.realm().meta_master_zone()
198 if not master_status:
199 master_status = meta_master_log_status(meta_master_zone)
200
201 for zone in zonegroup.zones:
202 if zone == meta_master_zone:
203 continue
204 zone_meta_checkpoint(zone, meta_master_zone, master_status)
205
206 def realm_meta_checkpoint(realm):
207 log.info('meta checkpoint')
208
209 meta_master_zone = realm.meta_master_zone()
210 master_status = meta_master_log_status(meta_master_zone)
211
212 for zonegroup in realm.current_period.zonegroups:
213 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
214
215 def parse_data_sync_status(data_sync_status_json):
216 data_sync_status_json = data_sync_status_json.decode('utf-8')
217 log.debug('current data sync status=%s', data_sync_status_json)
218 sync_status = json.loads(data_sync_status_json)
219
220 global_sync_status=sync_status['sync_status']['info']['status']
221 num_shards=sync_status['sync_status']['info']['num_shards']
222
223 sync_markers=sync_status['sync_status']['markers']
224 log.debug('sync_markers=%s', sync_markers)
225 assert(num_shards == len(sync_markers))
226
227 markers={}
228 for i in range(num_shards):
229 markers[i] = sync_markers[i]['val']['marker']
230
231 return (num_shards, markers)
232
233 def data_sync_status(target_zone, source_zone):
234 if target_zone == source_zone:
235 return None
236
237 for _ in range(config.checkpoint_retries):
238 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
239 cmd += ['--source-zone', source_zone.name]
240 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
241 if retcode == 0:
242 return parse_data_sync_status(data_sync_status_json)
243
244 assert(retcode == 2) # ENOENT
245 time.sleep(config.checkpoint_delay)
246
247 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
248 (target_zone.name, source_zone.name)
249
250 def bucket_sync_status(target_zone, source_zone, bucket_name):
251 if target_zone == source_zone:
252 return None
253
254 cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
255 cmd += ['--source-zone', source_zone.name]
256 cmd += ['--bucket', bucket_name]
257 while True:
258 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
259 if retcode == 0:
260 break
261
262 assert(retcode == 2) # ENOENT
263
264 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
265 log.debug('current bucket sync status=%s', bucket_sync_status_json)
266 sync_status = json.loads(bucket_sync_status_json)
267
268 markers={}
269 for entry in sync_status:
270 val = entry['val']
271 if val['status'] == 'incremental-sync':
272 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
273 else:
274 pos = ''
275 markers[entry['key']] = pos
276
277 return markers
278
279 def data_source_log_status(source_zone):
280 source_cluster = source_zone.cluster
281 cmd = ['datalog', 'status'] + source_zone.zone_args()
282 datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
283 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
284
285 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
286 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
287 return markers
288
289 def bucket_source_log_status(source_zone, bucket_name):
290 cmd = ['bilog', 'status'] + source_zone.zone_args()
291 cmd += ['--bucket', bucket_name]
292 source_cluster = source_zone.cluster
293 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
294 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
295
296 m={}
297 markers={}
298 try:
299 m = bilog_status['markers']
300 except:
301 pass
302
303 for s in m:
304 key = s['key']
305 val = s['val']
306 markers[key] = val
307
308 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
309 return markers
310
311 def compare_data_status(target_zone, source_zone, log_status, sync_status):
312 if len(log_status) != len(sync_status):
313 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
314 return False
315
316 msg = ''
317 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
318 if l > s:
319 if len(msg):
320 msg += ', '
321 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
322
323 if len(msg) > 0:
324 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
325 return False
326
327 return True
328
329 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
330 if len(log_status) != len(sync_status):
331 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
332 return False
333
334 msg = ''
335 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
336 if l > s:
337 if len(msg):
338 msg += ', '
339 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
340
341 if len(msg) > 0:
342 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
343 return False
344
345 return True
346
347 def zone_data_checkpoint(target_zone, source_zone_conn):
348 if target_zone == source_zone:
349 return
350
351 log_status = data_source_log_status(source_zone)
352 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
353
354 for _ in range(config.checkpoint_retries):
355 num_shards, sync_status = data_sync_status(target_zone, source_zone)
356
357 log.debug('log_status=%s', log_status)
358 log.debug('sync_status=%s', sync_status)
359
360 if compare_data_status(target_zone, source_zone, log_status, sync_status):
361 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone.name, source_zone.name)
363 return
364 time.sleep(config.checkpoint_delay)
365
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone.name, source_zone.name)
368
369
370 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
371 if target_zone == source_zone:
372 return
373
374 log_status = bucket_source_log_status(source_zone, bucket_name)
375 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
376
377 for _ in range(config.checkpoint_retries):
378 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
379
380 log.debug('log_status=%s', log_status)
381 log.debug('sync_status=%s', sync_status)
382
383 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
384 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
385 return
386
387 time.sleep(config.checkpoint_delay)
388
389 assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
390 (target_zone.name, source_zone.name, bucket_name)
391
392 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
393 for source_conn in zonegroup_conns.zones:
394 for target_conn in zonegroup_conns.zones:
395 if source_conn.zone == target_conn.zone:
396 continue
397 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
398 target_conn.check_bucket_eq(source_conn, bucket_name)
399
400 def set_master_zone(zone):
401 zone.modify(zone.cluster, ['--master'])
402 zonegroup = zone.zonegroup
403 zonegroup.period.update(zone, commit=True)
404 zonegroup.master_zone = zone
405 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
406 time.sleep(config.reconfigure_delay)
407
408 def enable_bucket_sync(zone, bucket_name):
409 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
410 zone.cluster.admin(cmd)
411
412 def disable_bucket_sync(zone, bucket_name):
413 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
414 zone.cluster.admin(cmd)
415
416 def check_buckets_sync_status_obj_not_exist(zone, buckets):
417 for _ in range(config.checkpoint_retries):
418 cmd = ['log', 'list'] + zone.zone_arg()
419 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
420 for bucket in buckets:
421 if log_list.find(':'+bucket+":") >= 0:
422 break
423 else:
424 return
425 time.sleep(config.checkpoint_delay)
426 assert False
427
428 def gen_bucket_name():
429 global num_buckets
430
431 num_buckets += 1
432 return run_prefix + '-' + str(num_buckets)
433
434 class ZonegroupConns:
435 def __init__(self, zonegroup):
436 self.zonegroup = zonegroup
437 self.zones = []
438 self.ro_zones = []
439 self.rw_zones = []
440 self.master_zone = None
441 for z in zonegroup.zones:
442 zone_conn = z.get_conn(user.credentials)
443 self.zones.append(zone_conn)
444 if z.is_read_only():
445 self.ro_zones.append(zone_conn)
446 else:
447 self.rw_zones.append(zone_conn)
448
449 if z == zonegroup.master_zone:
450 self.master_zone = zone_conn
451
452 def check_all_buckets_exist(zone_conn, buckets):
453 if not zone_conn.zone.has_buckets():
454 return True
455
456 for b in buckets:
457 try:
458 zone_conn.get_bucket(b)
459 except:
460 log.critical('zone %s does not contain bucket %s', zone.name, b)
461 return False
462
463 return True
464
465 def check_all_buckets_dont_exist(zone_conn, buckets):
466 if not zone_conn.zone.has_buckets():
467 return True
468
469 for b in buckets:
470 try:
471 zone_conn.get_bucket(b)
472 except:
473 continue
474
475 log.critical('zone %s contains bucket %s', zone.zone, b)
476 return False
477
478 return True
479
480 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
481 buckets = []
482 zone_bucket = []
483 for zone in zonegroup_conns.rw_zones:
484 for i in xrange(buckets_per_zone):
485 bucket_name = gen_bucket_name()
486 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
487 bucket = zone.create_bucket(bucket_name)
488 buckets.append(bucket_name)
489 zone_bucket.append((zone, bucket))
490
491 return buckets, zone_bucket
492
493 def create_bucket_per_zone_in_realm():
494 buckets = []
495 zone_bucket = []
496 for zonegroup in realm.current_period.zonegroups:
497 zg_conn = ZonegroupConns(zonegroup)
498 b, z = create_bucket_per_zone(zg_conn)
499 buckets.extend(b)
500 zone_bucket.extend(z)
501 return buckets, zone_bucket
502
503 def test_bucket_create():
504 zonegroup = realm.master_zonegroup()
505 zonegroup_conns = ZonegroupConns(zonegroup)
506 buckets, _ = create_bucket_per_zone(zonegroup_conns)
507 zonegroup_meta_checkpoint(zonegroup)
508
509 for zone in zonegroup_conns.zones:
510 assert check_all_buckets_exist(zone, buckets)
511
512 def test_bucket_recreate():
513 zonegroup = realm.master_zonegroup()
514 zonegroup_conns = ZonegroupConns(zonegroup)
515 buckets, _ = create_bucket_per_zone(zonegroup_conns)
516 zonegroup_meta_checkpoint(zonegroup)
517
518
519 for zone in zonegroup_conns.zones:
520 assert check_all_buckets_exist(zone, buckets)
521
522 # recreate buckets on all zones, make sure they weren't removed
523 for zone in zonegroup_conns.rw_zones:
524 for bucket_name in buckets:
525 bucket = zone.create_bucket(bucket_name)
526
527 for zone in zonegroup_conns.zones:
528 assert check_all_buckets_exist(zone, buckets)
529
530 zonegroup_meta_checkpoint(zonegroup)
531
532 for zone in zonegroup_conns.zones:
533 assert check_all_buckets_exist(zone, buckets)
534
535 def test_bucket_remove():
536 zonegroup = realm.master_zonegroup()
537 zonegroup_conns = ZonegroupConns(zonegroup)
538 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
539 zonegroup_meta_checkpoint(zonegroup)
540
541 for zone in zonegroup_conns.zones:
542 assert check_all_buckets_exist(zone, buckets)
543
544 for zone, bucket_name in zone_bucket:
545 zone.conn.delete_bucket(bucket_name)
546
547 zonegroup_meta_checkpoint(zonegroup)
548
549 for zone in zonegroup_conns.zones:
550 assert check_all_buckets_dont_exist(zone, buckets)
551
552 def get_bucket(zone, bucket_name):
553 return zone.conn.get_bucket(bucket_name)
554
555 def get_key(zone, bucket_name, obj_name):
556 b = get_bucket(zone, bucket_name)
557 return b.get_key(obj_name)
558
559 def new_key(zone, bucket_name, obj_name):
560 b = get_bucket(zone, bucket_name)
561 return b.new_key(obj_name)
562
563 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
564 return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
565
566 def test_object_sync():
567 zonegroup = realm.master_zonegroup()
568 zonegroup_conns = ZonegroupConns(zonegroup)
569 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
570
571 objnames = [ 'myobj', '_myobj', ':', '&' ]
572 content = 'asdasd'
573
574 # don't wait for meta sync just yet
575 for zone, bucket_name in zone_bucket:
576 for objname in objnames:
577 k = new_key(zone, bucket_name, objname)
578 k.set_contents_from_string(content)
579
580 zonegroup_meta_checkpoint(zonegroup)
581
582 for source_conn, bucket in zone_bucket:
583 for target_conn in zonegroup_conns.zones:
584 if source_conn.zone == target_conn.zone:
585 continue
586
587 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
588 check_bucket_eq(source_conn, target_conn, bucket)
589
590 def test_object_delete():
591 zonegroup = realm.master_zonegroup()
592 zonegroup_conns = ZonegroupConns(zonegroup)
593 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
594
595 objname = 'myobj'
596 content = 'asdasd'
597
598 # don't wait for meta sync just yet
599 for zone, bucket in zone_bucket:
600 k = new_key(zone, bucket, objname)
601 k.set_contents_from_string(content)
602
603 zonegroup_meta_checkpoint(zonegroup)
604
605 # check object exists
606 for source_conn, bucket in zone_bucket:
607 for target_conn in zonegroup_conns.zones:
608 if source_conn.zone == target_conn.zone:
609 continue
610
611 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
612 check_bucket_eq(source_conn, target_conn, bucket)
613
614 # check object removal
615 for source_conn, bucket in zone_bucket:
616 k = get_key(source_conn, bucket, objname)
617 k.delete()
618 for target_conn in zonegroup_conns.zones:
619 if source_conn.zone == target_conn.zone:
620 continue
621
622 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
623 check_bucket_eq(source_conn, target_conn, bucket)
624
625 def get_latest_object_version(key):
626 for k in key.bucket.list_versions(key.name):
627 if k.is_latest:
628 return k
629 return None
630
631 def test_versioned_object_incremental_sync():
632 zonegroup = realm.master_zonegroup()
633 zonegroup_conns = ZonegroupConns(zonegroup)
634 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
635
636 # enable versioning
637 for _, bucket in zone_bucket:
638 bucket.configure_versioning(True)
639
640 zonegroup_meta_checkpoint(zonegroup)
641
642 # upload a dummy object to each bucket and wait for sync. this forces each
643 # bucket to finish a full sync and switch to incremental
644 for source_conn, bucket in zone_bucket:
645 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
646 for target_conn in zonegroup_conns.zones:
647 if source_conn.zone == target_conn.zone:
648 continue
649 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
650
651 for _, bucket in zone_bucket:
652 # create and delete multiple versions of an object from each zone
653 for zone_conn in zonegroup_conns.rw_zones:
654 obj = 'obj-' + zone_conn.name
655 k = new_key(zone_conn, bucket, obj)
656
657 k.set_contents_from_string('version1')
658 v = get_latest_object_version(k)
659 log.debug('version1 id=%s', v.version_id)
660 # don't delete version1 - this tests that the initial version
661 # doesn't get squashed into later versions
662
663 # create and delete the following object versions to test that
664 # the operations don't race with each other during sync
665 k.set_contents_from_string('version2')
666 v = get_latest_object_version(k)
667 log.debug('version2 id=%s', v.version_id)
668 k.bucket.delete_key(obj, version_id=v.version_id)
669
670 k.set_contents_from_string('version3')
671 v = get_latest_object_version(k)
672 log.debug('version3 id=%s', v.version_id)
673 k.bucket.delete_key(obj, version_id=v.version_id)
674
675 for source_conn, bucket in zone_bucket:
676 for target_conn in zonegroup_conns.zones:
677 if source_conn.zone == target_conn.zone:
678 continue
679 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
680 check_bucket_eq(source_conn, target_conn, bucket)
681
682 def test_bucket_versioning():
683 buckets, zone_bucket = create_bucket_per_zone_in_realm()
684 for _, bucket in zone_bucket:
685 bucket.configure_versioning(True)
686 res = bucket.get_versioning_status()
687 key = 'Versioning'
688 assert(key in res and res[key] == 'Enabled')
689
690 def test_bucket_acl():
691 buckets, zone_bucket = create_bucket_per_zone_in_realm()
692 for _, bucket in zone_bucket:
693 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
694 bucket.set_acl('public-read')
695 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
696
697 def test_bucket_cors():
698 buckets, zone_bucket = create_bucket_per_zone_in_realm()
699 for _, bucket in zone_bucket:
700 cors_cfg = CORSConfiguration()
701 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
702 bucket.set_cors(cors_cfg)
703 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
704
705 def test_bucket_delete_notempty():
706 zonegroup = realm.master_zonegroup()
707 zonegroup_conns = ZonegroupConns(zonegroup)
708 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
709 zonegroup_meta_checkpoint(zonegroup)
710
711 for zone_conn, bucket_name in zone_bucket:
712 # upload an object to each bucket on its own zone
713 conn = zone_conn.get_connection()
714 bucket = conn.get_bucket(bucket_name)
715 k = bucket.new_key('foo')
716 k.set_contents_from_string('bar')
717 # attempt to delete the bucket before this object can sync
718 try:
719 conn.delete_bucket(bucket_name)
720 except boto.exception.S3ResponseError as e:
721 assert(e.error_code == 'BucketNotEmpty')
722 continue
723 assert False # expected 409 BucketNotEmpty
724
725 # assert that each bucket still exists on the master
726 c1 = zonegroup_conns.master_zone.conn
727 for _, bucket_name in zone_bucket:
728 assert c1.get_bucket(bucket_name)
729
730 def test_multi_period_incremental_sync():
731 zonegroup = realm.master_zonegroup()
732 if len(zonegroup.zones) < 3:
733 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
734
735 # periods to include in mdlog comparison
736 mdlog_periods = [realm.current_period.id]
737
738 # create a bucket in each zone
739 zonegroup_conns = ZonegroupConns(zonegroup)
740 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
741
742 zonegroup_meta_checkpoint(zonegroup)
743
744 z1, z2, z3 = zonegroup.zones[0:3]
745 assert(z1 == zonegroup.master_zone)
746
747 # kill zone 3 gateways to freeze sync status to incremental in first period
748 z3.stop()
749
750 # change master to zone 2 -> period 2
751 set_master_zone(z2)
752 mdlog_periods += [realm.current_period.id]
753
754 for zone_conn, _ in zone_bucket:
755 if zone_conn.zone == z3:
756 continue
757 bucket_name = gen_bucket_name()
758 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
759 bucket = zone_conn.conn.create_bucket(bucket_name)
760 buckets.append(bucket_name)
761
762 # wait for zone 1 to sync
763 zone_meta_checkpoint(z1)
764
765 # change master back to zone 1 -> period 3
766 set_master_zone(z1)
767 mdlog_periods += [realm.current_period.id]
768
769 for zone_conn, bucket_name in zone_bucket:
770 if zone_conn.zone == z3:
771 continue
772 bucket_name = gen_bucket_name()
773 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
774 bucket = zone_conn.conn.create_bucket(bucket_name)
775 buckets.append(bucket_name)
776
777 # restart zone 3 gateway and wait for sync
778 z3.start()
779 zonegroup_meta_checkpoint(zonegroup)
780
781 # verify that we end up with the same objects
782 for bucket_name in buckets:
783 for source_conn, _ in zone_bucket:
784 for target_conn in zonegroup_conns.zones:
785 if source_conn.zone == target_conn.zone:
786 continue
787
788 target_conn.check_bucket_eq(source_conn, bucket_name)
789
790 # verify that mdlogs are not empty and match for each period
791 for period in mdlog_periods:
792 master_mdlog = mdlog_list(z1, period)
793 assert len(master_mdlog) > 0
794 for zone in zonegroup.zones:
795 if zone == z1:
796 continue
797 mdlog = mdlog_list(zone, period)
798 assert len(mdlog) == len(master_mdlog)
799
800 # autotrim mdlogs for master zone
801 mdlog_autotrim(z1)
802
803 # autotrim mdlogs for peers
804 for zone in zonegroup.zones:
805 if zone == z1:
806 continue
807 mdlog_autotrim(zone)
808
809 # verify that mdlogs are empty for each period
810 for period in mdlog_periods:
811 for zone in zonegroup.zones:
812 mdlog = mdlog_list(zone, period)
813 assert len(mdlog) == 0
814
815 def test_zonegroup_remove():
816 zonegroup = realm.master_zonegroup()
817 zonegroup_conns = ZonegroupConns(zonegroup)
818 if len(zonegroup.zones) < 2:
819 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
820
821 zonegroup_meta_checkpoint(zonegroup)
822 z1, z2 = zonegroup.zones[0:2]
823 c1, c2 = (z1.cluster, z2.cluster)
824
825 # create a new zone in zonegroup on c2 and commit
826 zone = Zone('remove', zonegroup, c2)
827 zone.create(c2)
828 zonegroup.zones.append(zone)
829 zonegroup.period.update(zone, commit=True)
830
831 zonegroup.remove(c1, zone)
832
833 # another 'zonegroup remove' should fail with ENOENT
834 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
835 assert(retcode == 2) # ENOENT
836
837 # delete the new zone
838 zone.delete(c2)
839
840 # validate the resulting period
841 zonegroup.period.update(z1, commit=True)
842
843 def test_set_bucket_website():
844 buckets, zone_bucket = create_bucket_per_zone_in_realm()
845 for _, bucket in zone_bucket:
846 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
847 try:
848 bucket.set_website_configuration(website_cfg)
849 except boto.exception.S3ResponseError as e:
850 if e.error_code == 'MethodNotAllowed':
851 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
852 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
853
854 def test_set_bucket_policy():
855 policy = '''{
856 "Version": "2012-10-17",
857 "Statement": [{
858 "Effect": "Allow",
859 "Principal": "*"
860 }]
861 }'''
862 buckets, zone_bucket = create_bucket_per_zone_in_realm()
863 for _, bucket in zone_bucket:
864 bucket.set_policy(policy)
865 assert(bucket.get_policy() == policy)
866
867 def test_bucket_sync_disable():
868 zonegroup = realm.master_zonegroup()
869 zonegroup_conns = ZonegroupConns(zonegroup)
870 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
871
872 for bucket_name in buckets:
873 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
874
875 for zone in zonegroup.zones:
876 check_buckets_sync_status_obj_not_exist(zone, buckets)
877
878 def test_bucket_sync_enable_right_after_disable():
879 zonegroup = realm.master_zonegroup()
880 zonegroup_conns = ZonegroupConns(zonegroup)
881 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
882
883 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
884 content = 'asdasd'
885
886 for zone, bucket in zone_bucket:
887 for objname in objnames:
888 k = new_key(zone, bucket.name, objname)
889 k.set_contents_from_string(content)
890
891 for bucket_name in buckets:
892 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
893
894 for bucket_name in buckets:
895 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
896 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
897
898 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
899
900 for zone, bucket in zone_bucket:
901 for objname in objnames_2:
902 k = new_key(zone, bucket.name, objname)
903 k.set_contents_from_string(content)
904
905 for bucket_name in buckets:
906 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
907
908 def test_bucket_sync_disable_enable():
909 zonegroup = realm.master_zonegroup()
910 zonegroup_conns = ZonegroupConns(zonegroup)
911 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
912
913 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
914 content = 'asdasd'
915
916 for zone, bucket in zone_bucket:
917 for objname in objnames:
918 k = new_key(zone, bucket.name, objname)
919 k.set_contents_from_string(content)
920
921 zonegroup_meta_checkpoint(zonegroup)
922
923 for bucket_name in buckets:
924 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
925
926 for bucket_name in buckets:
927 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
928
929 zonegroup_meta_checkpoint(zonegroup)
930
931 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
932
933 for zone, bucket in zone_bucket:
934 for objname in objnames_2:
935 k = new_key(zone, bucket.name, objname)
936 k.set_contents_from_string(content)
937
938 for bucket_name in buckets:
939 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
940
941 for bucket_name in buckets:
942 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
943
944 def test_multipart_object_sync():
945 zonegroup = realm.master_zonegroup()
946 zonegroup_conns = ZonegroupConns(zonegroup)
947 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
948
949 _, bucket = zone_bucket[0]
950
951 # initiate a multipart upload
952 upload = bucket.initiate_multipart_upload('MULTIPART')
953 mp = boto.s3.multipart.MultiPartUpload(bucket)
954 mp.key_name = upload.key_name
955 mp.id = upload.id
956 part_size = 5 * 1024 * 1024 # 5M min part size
957 mp.upload_part_from_file(StringIO('a' * part_size), 1)
958 mp.upload_part_from_file(StringIO('b' * part_size), 2)
959 mp.upload_part_from_file(StringIO('c' * part_size), 3)
960 mp.upload_part_from_file(StringIO('d' * part_size), 4)
961 mp.complete_upload()
962
963 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
964
965 def test_encrypted_object_sync():
966 zonegroup = realm.master_zonegroup()
967 zonegroup_conns = ZonegroupConns(zonegroup)
968
969 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
970
971 # create a bucket on the first zone
972 bucket_name = gen_bucket_name()
973 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
974 bucket = zone1.conn.create_bucket(bucket_name)
975
976 # upload an object with sse-c encryption
977 sse_c_headers = {
978 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
979 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
980 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
981 }
982 key = bucket.new_key('testobj-sse-c')
983 data = 'A'*512
984 key.set_contents_from_string(data, headers=sse_c_headers)
985
986 # upload an object with sse-kms encryption
987 sse_kms_headers = {
988 'x-amz-server-side-encryption': 'aws:kms',
989 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
990 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
991 }
992 key = bucket.new_key('testobj-sse-kms')
993 key.set_contents_from_string(data, headers=sse_kms_headers)
994
995 # wait for the bucket metadata and data to sync
996 zonegroup_meta_checkpoint(zonegroup)
997 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
998
999 # read the encrypted objects from the second zone
1000 bucket2 = get_bucket(zone2, bucket_name)
1001 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
1002 eq(data, key.get_contents_as_string(headers=sse_c_headers))
1003
1004 key = bucket2.get_key('testobj-sse-kms')
1005 eq(data, key.get_contents_as_string())
1006
1007 def test_bucket_index_log_trim():
1008 zonegroup = realm.master_zonegroup()
1009 zonegroup_conns = ZonegroupConns(zonegroup)
1010
1011 zone = zonegroup_conns.rw_zones[0]
1012
1013 # create a test bucket, upload some objects, and wait for sync
1014 def make_test_bucket():
1015 name = gen_bucket_name()
1016 log.info('create bucket zone=%s name=%s', zone.name, name)
1017 bucket = zone.conn.create_bucket(name)
1018 for objname in ('a', 'b', 'c', 'd'):
1019 k = new_key(zone, name, objname)
1020 k.set_contents_from_string('foo')
1021 zonegroup_meta_checkpoint(zonegroup)
1022 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1023 return bucket
1024
1025 # create a 'cold' bucket
1026 cold_bucket = make_test_bucket()
1027
1028 # trim with max-buckets=0 to clear counters for cold bucket. this should
1029 # prevent it from being considered 'active' by the next autotrim
1030 bilog_autotrim(zone.zone, [
1031 '--rgw-sync-log-trim-max-buckets', '0',
1032 ])
1033
1034 # create an 'active' bucket
1035 active_bucket = make_test_bucket()
1036
1037 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1038 bilog_autotrim(zone.zone, [
1039 '--rgw-sync-log-trim-max-buckets', '1',
1040 '--rgw-sync-log-trim-min-cold-buckets', '0',
1041 ])
1042
1043 # verify active bucket has empty bilog
1044 active_bilog = bilog_list(zone.zone, active_bucket.name)
1045 assert(len(active_bilog) == 0)
1046
1047 # verify cold bucket has nonempty bilog
1048 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1049 assert(len(cold_bilog) > 0)
1050
1051 # trim with min-cold-buckets=999 to trim all buckets
1052 bilog_autotrim(zone.zone, [
1053 '--rgw-sync-log-trim-max-buckets', '999',
1054 '--rgw-sync-log-trim-min-cold-buckets', '999',
1055 ])
1056
1057 # verify cold bucket has empty bilog
1058 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1059 assert(len(cold_bilog) == 0)